You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/08/17 05:25:18 UTC

[flink] branch release-1.6 updated (3f3736f -> decc0bf)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3f3736f  [FLINK-10116] [types] Fix getTotalFields() implementation of some TypeInfos.
     new 0949db1  [FLINK-9899] Add more ShardConsumer metrics
     new decc0bf  [FLINK-10020] [kinesis] Support recoverable exceptions in listShards.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/monitoring/metrics.md                         | 65 +++++++++++++++
 .../kinesis/config/ConsumerConfigConstants.java    |  7 +-
 .../kinesis/internals/KinesisDataFetcher.java      |  9 ++-
 .../kinesis/internals/ShardConsumer.java           | 18 ++++-
 .../metrics/KinesisConsumerMetricConstants.java    |  9 +++
 .../kinesis/metrics/ShardMetricsReporter.java      | 72 +++++++++++++++++
 .../connectors/kinesis/proxy/KinesisProxy.java     | 53 ++++++++----
 .../connectors/kinesis/proxy/KinesisProxyTest.java | 94 ++++++++++++++++++++++
 8 files changed, 306 insertions(+), 21 deletions(-)


[flink] 01/02: [FLINK-9899] Add more ShardConsumer metrics

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0949db1197dba1b86f49d7665a720bcd3f608098
Author: Lakshmi Gururaja Rao <gl...@gmail.com>
AuthorDate: Tue Jul 24 14:13:53 2018 -0700

    [FLINK-9899] Add more ShardConsumer metrics
    
    This closes #6409.
---
 docs/monitoring/metrics.md                         | 65 +++++++++++++++++++
 .../kinesis/internals/KinesisDataFetcher.java      |  9 ++-
 .../kinesis/internals/ShardConsumer.java           | 18 +++++-
 .../metrics/KinesisConsumerMetricConstants.java    |  9 +++
 .../kinesis/metrics/ShardMetricsReporter.java      | 72 ++++++++++++++++++++++
 5 files changed, 170 insertions(+), 3 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 8be5878..89a524c 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1424,6 +1424,71 @@ Thus, in order to infer the metric identifier:
       </td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>sleepTimeMillis</td>
+      <td>stream, shardId</td>
+      <td>The number of milliseconds the consumer spends sleeping before fetching records from Kinesis.
+      A particular shard's metric can be specified by stream name and shard id.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>maxNumberOfRecordsPerFetch</td>
+      <td>stream, shardId</td>
+      <td>The maximum number of records requested by the consumer in a single getRecords call to Kinesis. If ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS
+      is set to true, this value is adaptively calculated to maximize the 2 Mbps read limits from Kinesis.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>numberOfAggregatedRecordsPerFetch</td>
+      <td>stream, shardId</td>
+      <td>The number of aggregated Kinesis records fetched by the consumer in a single getRecords call to Kinesis.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>numberOfDeggregatedRecordsPerFetch</td>
+      <td>stream, shardId</td>
+      <td>The number of deaggregated Kinesis records fetched by the consumer in a single getRecords call to Kinesis.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>averageRecordSizeBytes</td>
+      <td>stream, shardId</td>
+      <td>The average size of a Kinesis record in bytes, fetched by the consumer in a single getRecords call.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>runLoopTimeNanos</td>
+      <td>stream, shardId</td>
+      <td>The actual time taken, in nanoseconds, by the consumer in the run loop.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>loopFrequencyHz</td>
+      <td>stream, shardId</td>
+      <td>The number of calls to getRecords in one second. 
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>bytesRequestedPerFetch</td>
+      <td>stream, shardId</td>
+      <td>The bytes requested (2 Mbps / loopFrequencyHz) in a single call to getRecords.
+      <td>Gauge</td>
+    </tr>
   </tbody>
 </table>
 
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 13de032..0981b76 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -630,7 +630,14 @@ public class KinesisDataFetcher<T> {
 				shardState.getStreamShardHandle().getShard().getShardId());
 
 		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, shardMetrics::getMillisBehindLatest);
-
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH, shardMetrics::getMaxNumberOfRecordsPerFetch);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfAggregatedRecords);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfDeaggregatedRecords);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES, shardMetrics::getAverageRecordSizeBytes);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, shardMetrics::getBytesPerRead);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, shardMetrics::getRunLoopTimeNanos);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, shardMetrics::getLoopFrequencyHz);
+		streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, shardMetrics::getSleepTimeMillis);
 		return shardMetrics;
 	}
 
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 6de7278..5845eea 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -212,12 +212,16 @@ public class ShardConsumer<T> implements Runnable {
 					// we can close this consumer thread once we've reached the end of the subscribed shard
 					break;
 				} else {
-
+					shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
 					GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
 
+					List<Record> aggregatedRecords = getRecordsResult.getRecords();
+					int numberOfAggregatedRecords = aggregatedRecords.size();
+					shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);
+
 					// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
 					List<UserRecord> fetchedRecords = deaggregateRecords(
-						getRecordsResult.getRecords(),
+						aggregatedRecords,
 						subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
 						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
@@ -227,11 +231,15 @@ public class ShardConsumer<T> implements Runnable {
 						deserializeRecordForCollectionAndUpdateState(record);
 					}
 
+					int numberOfDeaggregatedRecords = fetchedRecords.size();
+					shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);
+
 					nextShardItr = getRecordsResult.getNextShardIterator();
 
 					long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
 					long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos;
 					maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
+					shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
 					processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop
 				}
 			}
@@ -256,6 +264,7 @@ public class ShardConsumer<T> implements Runnable {
 			if (sleepTimeMillis > 0) {
 				Thread.sleep(sleepTimeMillis);
 				endTimeNanos = System.nanoTime();
+				shardMetricsReporter.setSleepTimeMillis(sleepTimeMillis);
 			}
 		}
 		return endTimeNanos;
@@ -280,6 +289,11 @@ public class ShardConsumer<T> implements Runnable {
 			maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes);
 			// Ensure the value is greater than 0 and not more than 10000L
 			maxNumberOfRecordsPerFetch = Math.max(1, Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX));
+
+			// Set metrics
+			shardMetricsReporter.setAverageRecordSizeBytes(averageRecordSizeBytes);
+			shardMetricsReporter.setLoopFrequencyHz(loopFrequencyHz);
+			shardMetricsReporter.setBytesPerRead(bytesPerRead);
 		}
 		return maxNumberOfRecordsPerFetch;
 	}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java
index 1b83f16..e850d25 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java
@@ -34,4 +34,13 @@ public class KinesisConsumerMetricConstants {
 	public static final String SHARD_METRICS_GROUP = "shardId";
 
 	public static final String MILLIS_BEHIND_LATEST_GAUGE = "millisBehindLatest";
+	public static final String SLEEP_TIME_MILLIS = "sleepTimeMillis";
+	public static final String MAX_RECORDS_PER_FETCH = "maxNumberOfRecordsPerFetch";
+	public static final String NUM_AGGREGATED_RECORDS_PER_FETCH = "numberOfAggregatedRecordsPerFetch";
+	public static final String NUM_DEAGGREGATED_RECORDS_PER_FETCH = "numberOfDeaggregatedRecordsPerFetch";
+	public static final String AVG_RECORD_SIZE_BYTES = "averageRecordSizeBytes";
+	public static final String RUNTIME_LOOP_NANOS = "runLoopTimeNanos";
+	public static final String LOOP_FREQUENCY_HZ = "loopFrequencyHz";
+	public static final String BYTES_PER_READ = "bytesRequestedPerFetch";
+
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
index 2b6a491..4a27b9c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
@@ -28,6 +28,14 @@ import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
 public class ShardMetricsReporter {
 
 	private volatile long millisBehindLatest = -1;
+	private volatile double loopFrequencyHz = 0.0;
+	private volatile double bytesPerRead = 0.0;
+	private volatile long runLoopTimeNanos = 0L;
+	private volatile long averageRecordSizeBytes = 0L;
+	private volatile long sleepTimeMillis = 0L;
+	private volatile int numberOfAggregatedRecords = 0;
+	private volatile int numberOfDeaggregatedRecords = 0;
+	private volatile int maxNumberOfRecordsPerFetch = 0;
 
 	public long getMillisBehindLatest() {
 		return millisBehindLatest;
@@ -37,4 +45,68 @@ public class ShardMetricsReporter {
 		this.millisBehindLatest = millisBehindLatest;
 	}
 
+	public double getLoopFrequencyHz() {
+		return loopFrequencyHz;
+	}
+
+	public void setLoopFrequencyHz(double loopFrequencyHz) {
+		this.loopFrequencyHz = loopFrequencyHz;
+	}
+
+	public double getBytesPerRead() {
+		return bytesPerRead;
+	}
+
+	public void setBytesPerRead(double bytesPerRead) {
+		this.bytesPerRead = bytesPerRead;
+	}
+
+	public long getRunLoopTimeNanos() {
+		return runLoopTimeNanos;
+	}
+
+	public void setRunLoopTimeNanos(long runLoopTimeNanos) {
+		this.runLoopTimeNanos = runLoopTimeNanos;
+	}
+
+	public long getAverageRecordSizeBytes() {
+		return averageRecordSizeBytes;
+	}
+
+	public void setAverageRecordSizeBytes(long averageRecordSizeBytes) {
+		this.averageRecordSizeBytes = averageRecordSizeBytes;
+	}
+
+	public long getSleepTimeMillis() {
+		return sleepTimeMillis;
+	}
+
+	public void setSleepTimeMillis(long sleepTimeMillis) {
+		this.sleepTimeMillis = sleepTimeMillis;
+	}
+
+	public int getNumberOfAggregatedRecords() {
+		return numberOfAggregatedRecords;
+	}
+
+	public void setNumberOfAggregatedRecords(int numberOfAggregatedRecords) {
+		this.numberOfAggregatedRecords = numberOfAggregatedRecords;
+	}
+
+	public int getNumberOfDeaggregatedRecords() {
+		return numberOfDeaggregatedRecords;
+	}
+
+	public void setNumberOfDeaggregatedRecords(int numberOfDeaggregatedRecords) {
+		this.numberOfDeaggregatedRecords = numberOfDeaggregatedRecords;
+	}
+
+	public int getMaxNumberOfRecordsPerFetch() {
+		return maxNumberOfRecordsPerFetch;
+	}
+
+	public void setMaxNumberOfRecordsPerFetch(int maxNumberOfRecordsPerFetch) {
+		this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
+	}
+
 }


[flink] 02/02: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards.

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit decc0bf2404d6b3572ff1bb4ee2839e11777d258
Author: Thomas Weise <th...@apache.org>
AuthorDate: Thu Aug 2 17:47:34 2018 -0700

    [FLINK-10020] [kinesis] Support recoverable exceptions in listShards.
    
    This closes #6482.
---
 .../kinesis/config/ConsumerConfigConstants.java    |  7 +-
 .../connectors/kinesis/proxy/KinesisProxy.java     | 53 ++++++++----
 .../connectors/kinesis/proxy/KinesisProxyTest.java | 94 ++++++++++++++++++++++
 3 files changed, 136 insertions(+), 18 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 48a0b3c..443b19e 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -92,6 +92,9 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 	/** The power constant for exponential backoff between each describeStream attempt. */
 	public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";
 
+	/** The maximum number of listShards attempts if we get a recoverable exception. */
+	public static final String LIST_SHARDS_RETRIES = "flink.list.shards.maxretries";
+
 	/** The base backoff time between each listShards attempt. */
 	public static final String LIST_SHARDS_BACKOFF_BASE = "flink.list.shards.backoff.base";
 
@@ -104,7 +107,7 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 	/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
 	public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
 
-	/** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException. */
+	/** The maximum number of getRecords attempts if we get a recoverable exception. */
 	public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";
 
 	/** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
@@ -161,6 +164,8 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 
 	public static final double DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
+	public static final int DEFAULT_LIST_SHARDS_RETRIES = 10;
+
 	public static final int DEFAULT_SHARD_GETRECORDS_MAX = 10000;
 
 	public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 7e6a360..262181a 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -91,6 +91,9 @@ public class KinesisProxy implements KinesisProxyInterface {
 	/** Exponential backoff power constant for the list shards operation. */
 	private final double listShardsExpConstant;
 
+	/** Maximum retry attempts for the list shards operation. */
+	private final int listShardsMaxRetries;
+
 	// ------------------------------------------------------------------------
 	//  getRecords() related performance settings
 	// ------------------------------------------------------------------------
@@ -104,8 +107,8 @@ public class KinesisProxy implements KinesisProxyInterface {
 	/** Exponential backoff power constant for the get records operation. */
 	private final double getRecordsExpConstant;
 
-	/** Maximum attempts for the get records operation. */
-	private final int getRecordsMaxAttempts;
+	/** Maximum retry attempts for the get records operation. */
+	private final int getRecordsMaxRetries;
 
 	// ------------------------------------------------------------------------
 	//  getShardIterator() related performance settings
@@ -120,8 +123,8 @@ public class KinesisProxy implements KinesisProxyInterface {
 	/** Exponential backoff power constant for the get shard iterator operation. */
 	private final double getShardIteratorExpConstant;
 
-	/** Maximum attempts for the get shard iterator operation. */
-	private final int getShardIteratorMaxAttempts;
+	/** Maximum retry attempts for the get shard iterator operation. */
+	private final int getShardIteratorMaxRetries;
 
 	/**
 	 * Create a new KinesisProxy based on the supplied configuration properties.
@@ -146,6 +149,10 @@ public class KinesisProxy implements KinesisProxyInterface {
 			configProps.getProperty(
 				ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
 				Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.listShardsMaxRetries = Integer.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_SHARDS_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES)));
 
 		this.getRecordsBaseBackoffMillis = Long.valueOf(
 			configProps.getProperty(
@@ -159,7 +166,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
 				Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
-		this.getRecordsMaxAttempts = Integer.valueOf(
+		this.getRecordsMaxRetries = Integer.valueOf(
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
 				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
@@ -176,7 +183,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
 				Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
-		this.getShardIteratorMaxAttempts = Integer.valueOf(
+		this.getShardIteratorMaxRetries = Integer.valueOf(
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
 				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
@@ -217,14 +224,14 @@ public class KinesisProxy implements KinesisProxyInterface {
 
 		GetRecordsResult getRecordsResult = null;
 
-		int attempt = 0;
-		while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
+		int retryCount = 0;
+		while (retryCount <= getRecordsMaxRetries && getRecordsResult == null) {
 			try {
 				getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
 			} catch (SdkClientException ex) {
 				if (isRecoverableSdkClientException(ex)) {
 					long backoffMillis = fullJitterBackoff(
-						getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
+						getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, retryCount++);
 					LOG.warn("Got recoverable SdkClientException. Backing off for "
 						+ backoffMillis + " millis (" + ex.getMessage() + ")");
 					Thread.sleep(backoffMillis);
@@ -235,7 +242,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 		}
 
 		if (getRecordsResult == null) {
-			throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
+			throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxRetries +
 				" retry attempts returned ProvisionedThroughputExceededException.");
 		}
 
@@ -292,14 +299,14 @@ public class KinesisProxy implements KinesisProxyInterface {
 	private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws InterruptedException {
 		GetShardIteratorResult getShardIteratorResult = null;
 
-		int attempt = 0;
-		while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
+		int retryCount = 0;
+		while (retryCount <= getShardIteratorMaxRetries && getShardIteratorResult == null) {
 			try {
 					getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
 			} catch (AmazonServiceException ex) {
 				if (isRecoverableException(ex)) {
 					long backoffMillis = fullJitterBackoff(
-						getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
+						getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, retryCount++);
 					LOG.warn("Got recoverable AmazonServiceException. Backing off for "
 						+ backoffMillis + " millis (" + ex.getErrorMessage() + ")");
 					Thread.sleep(backoffMillis);
@@ -310,7 +317,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 		}
 
 		if (getShardIteratorResult == null) {
-			throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts +
+			throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxRetries +
 				" retry attempts returned ProvisionedThroughputExceededException.");
 		}
 		return getShardIteratorResult.getShardIterator();
@@ -406,16 +413,16 @@ public class KinesisProxy implements KinesisProxyInterface {
 		ListShardsResult listShardsResults = null;
 
 		// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
-		int attemptCount = 0;
+		int retryCount = 0;
 		// List Shards returns just the first 1000 shard entries. Make sure that all entries
 		// are taken up.
-		while (listShardsResults == null) { // retry until we get a result
+		while (retryCount <= listShardsMaxRetries && listShardsResults == null) { // retry until we get a result
 			try {
 
 				listShardsResults = kinesisClient.listShards(listShardsRequest);
 			} catch (LimitExceededException le) {
 				long backoffMillis = fullJitterBackoff(
-						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
 					LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
 									+ ". Backing off for " + backoffMillis + " millis.");
 				Thread.sleep(backoffMillis);
@@ -433,6 +440,18 @@ public class KinesisProxy implements KinesisProxyInterface {
 			} catch (ExpiredNextTokenException expiredToken) {
 				LOG.warn("List Shards has an expired token. Reusing the previous state.");
 				break;
+			} catch (SdkClientException ex) {
+				if (retryCount < listShardsMaxRetries && isRecoverableSdkClientException(ex)) {
+					long backoffMillis = fullJitterBackoff(
+						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
+					LOG.warn("Got SdkClientException when listing shards from stream {}. Backing off for {} millis.",
+						streamName, backoffMillis);
+					Thread.sleep(backoffMillis);
+				} else {
+					// propagate if retries exceeded or not recoverable
+					// (otherwise would return null result and keep trying forever)
+					throw ex;
+				}
 			}
 		}
 		// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index 775ae4b..edf6ceb 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -27,16 +27,24 @@ import com.amazonaws.AmazonServiceException;
 import com.amazonaws.AmazonServiceException.ErrorType;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.SdkClientException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.AmazonKinesisException;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.ListShardsRequest;
 import com.amazonaws.services.kinesis.model.ListShardsResult;
 import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.powermock.reflect.Whitebox;
 
 import java.util.ArrayList;
@@ -54,6 +62,7 @@ import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInA
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -92,6 +101,37 @@ public class KinesisProxyTest {
 	}
 
 	@Test
+	public void testGetRecordsRetry() throws Exception {
+		Properties kinesisConsumerConfig = new Properties();
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+
+		final GetRecordsResult expectedResult = new GetRecordsResult();
+		MutableInt retries = new MutableInt();
+		final Throwable[] retriableExceptions = new Throwable[] {
+			new AmazonKinesisException("mock"),
+		};
+
+		AmazonKinesisClient mockClient = mock(AmazonKinesisClient.class);
+		Mockito.when(mockClient.getRecords(any())).thenAnswer(new Answer<GetRecordsResult>() {
+			@Override
+			public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable{
+				if (retries.intValue() < retriableExceptions.length) {
+					retries.increment();
+					throw retriableExceptions[retries.intValue() - 1];
+				}
+				return expectedResult;
+			}
+		});
+
+		KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
+		Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient);
+
+		GetRecordsResult result = kinesisProxy.getRecords("fakeShardIterator", 1);
+		assertEquals(retriableExceptions.length, retries.intValue());
+		assertEquals(expectedResult, result);
+	}
+
+	@Test
 	public void testGetShardList() throws Exception {
 		List<String> shardIds =
 				Arrays.asList(
@@ -152,6 +192,60 @@ public class KinesisProxyTest {
 	}
 
 	@Test
+	public void testGetShardListRetry() throws Exception {
+		Properties kinesisConsumerConfig = new Properties();
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+
+		Shard shard = new Shard();
+		shard.setShardId("fake-shard-000000000000");
+		final ListShardsResult expectedResult = new ListShardsResult();
+		expectedResult.withShards(shard);
+
+		MutableInt exceptionCount = new MutableInt();
+		final Throwable[] retriableExceptions = new Throwable[]{
+			new AmazonKinesisException("attempt1"),
+			new AmazonKinesisException("attempt2"),
+		};
+
+		AmazonKinesisClient mockClient = mock(AmazonKinesisClient.class);
+		Mockito.when(mockClient.listShards(any())).thenAnswer(new Answer<ListShardsResult>() {
+
+			@Override
+			public ListShardsResult answer(InvocationOnMock invocation) throws Throwable {
+				if (exceptionCount.intValue() < retriableExceptions.length) {
+					exceptionCount.increment();
+					throw retriableExceptions[exceptionCount.intValue() - 1];
+				}
+				return expectedResult;
+			}
+		});
+
+		KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
+		Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient);
+
+		HashMap<String, String> streamNames = new HashMap();
+		streamNames.put("fake-stream", null);
+		GetShardListResult result = kinesisProxy.getShardList(streamNames);
+		assertEquals(retriableExceptions.length, exceptionCount.intValue());
+		assertEquals(true, result.hasRetrievedShards());
+		assertEquals(shard.getShardId(), result.getLastSeenShardOfStream("fake-stream").getShard().getShardId());
+
+		// test max attempt count exceeded
+		int maxRetries = 1;
+		exceptionCount.setValue(0);
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_RETRIES, String.valueOf(maxRetries));
+		kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
+		Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient);
+		try {
+			kinesisProxy.getShardList(streamNames);
+			Assert.fail("exception expected");
+		} catch (SdkClientException ex) {
+			assertEquals(retriableExceptions[maxRetries], ex);
+		}
+		assertEquals(maxRetries + 1, exceptionCount.intValue());
+	}
+
+	@Test
 	public void testCustomConfigurationOverride() {
 		Properties configProps = new Properties();
 		configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");