You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/17 05:26:11 UTC

[GitHub] asfgit closed pull request #6482: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards.

asfgit closed pull request #6482: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards.
URL: https://github.com/apache/flink/pull/6482
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 48a0b3c9559..443b19ec382 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -92,6 +92,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 	/** The power constant for exponential backoff between each describeStream attempt. */
 	public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";
 
+	/** The maximum number of listShards attempts if we get a recoverable exception. */
+	public static final String LIST_SHARDS_RETRIES = "flink.list.shards.maxretries";
+
 	/** The base backoff time between each listShards attempt. */
 	public static final String LIST_SHARDS_BACKOFF_BASE = "flink.list.shards.backoff.base";
 
@@ -104,7 +107,7 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 	/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
 	public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
 
-	/** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException. */
+	/** The maximum number of getRecords attempts if we get a recoverable exception. */
 	public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";
 
 	/** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
@@ -161,6 +164,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 
 	public static final double DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
+	public static final int DEFAULT_LIST_SHARDS_RETRIES = 10;
+
 	public static final int DEFAULT_SHARD_GETRECORDS_MAX = 10000;
 
 	public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 7e6a3604414..262181ae3bc 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -91,6 +91,9 @@
 	/** Exponential backoff power constant for the list shards operation. */
 	private final double listShardsExpConstant;
 
+	/** Maximum retry attempts for the list shards operation. */
+	private final int listShardsMaxRetries;
+
 	// ------------------------------------------------------------------------
 	//  getRecords() related performance settings
 	// ------------------------------------------------------------------------
@@ -104,8 +107,8 @@
 	/** Exponential backoff power constant for the get records operation. */
 	private final double getRecordsExpConstant;
 
-	/** Maximum attempts for the get records operation. */
-	private final int getRecordsMaxAttempts;
+	/** Maximum retry attempts for the get records operation. */
+	private final int getRecordsMaxRetries;
 
 	// ------------------------------------------------------------------------
 	//  getShardIterator() related performance settings
@@ -120,8 +123,8 @@
 	/** Exponential backoff power constant for the get shard iterator operation. */
 	private final double getShardIteratorExpConstant;
 
-	/** Maximum attempts for the get shard iterator operation. */
-	private final int getShardIteratorMaxAttempts;
+	/** Maximum retry attempts for the get shard iterator operation. */
+	private final int getShardIteratorMaxRetries;
 
 	/**
 	 * Create a new KinesisProxy based on the supplied configuration properties.
@@ -146,6 +149,10 @@ protected KinesisProxy(Properties configProps) {
 			configProps.getProperty(
 				ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
 				Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.listShardsMaxRetries = Integer.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.LIST_SHARDS_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES)));
 
 		this.getRecordsBaseBackoffMillis = Long.valueOf(
 			configProps.getProperty(
@@ -159,7 +166,7 @@ protected KinesisProxy(Properties configProps) {
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
 				Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
-		this.getRecordsMaxAttempts = Integer.valueOf(
+		this.getRecordsMaxRetries = Integer.valueOf(
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
 				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
@@ -176,7 +183,7 @@ protected KinesisProxy(Properties configProps) {
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
 				Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
-		this.getShardIteratorMaxAttempts = Integer.valueOf(
+		this.getShardIteratorMaxRetries = Integer.valueOf(
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
 				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
@@ -217,14 +224,14 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th
 
 		GetRecordsResult getRecordsResult = null;
 
-		int attempt = 0;
-		while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
+		int retryCount = 0;
+		while (retryCount <= getRecordsMaxRetries && getRecordsResult == null) {
 			try {
 				getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
 			} catch (SdkClientException ex) {
 				if (isRecoverableSdkClientException(ex)) {
 					long backoffMillis = fullJitterBackoff(
-						getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
+						getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, retryCount++);
 					LOG.warn("Got recoverable SdkClientException. Backing off for "
 						+ backoffMillis + " millis (" + ex.getMessage() + ")");
 					Thread.sleep(backoffMillis);
@@ -235,7 +242,7 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th
 		}
 
 		if (getRecordsResult == null) {
-			throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
+			throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxRetries +
 				" retry attempts returned ProvisionedThroughputExceededException.");
 		}
 
@@ -292,14 +299,14 @@ public String getShardIterator(StreamShardHandle shard, String shardIteratorType
 	private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws InterruptedException {
 		GetShardIteratorResult getShardIteratorResult = null;
 
-		int attempt = 0;
-		while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
+		int retryCount = 0;
+		while (retryCount <= getShardIteratorMaxRetries && getShardIteratorResult == null) {
 			try {
 					getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
 			} catch (AmazonServiceException ex) {
 				if (isRecoverableException(ex)) {
 					long backoffMillis = fullJitterBackoff(
-						getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
+						getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, retryCount++);
 					LOG.warn("Got recoverable AmazonServiceException. Backing off for "
 						+ backoffMillis + " millis (" + ex.getErrorMessage() + ")");
 					Thread.sleep(backoffMillis);
@@ -310,7 +317,7 @@ private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest)
 		}
 
 		if (getShardIteratorResult == null) {
-			throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts +
+			throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxRetries +
 				" retry attempts returned ProvisionedThroughputExceededException.");
 		}
 		return getShardIteratorResult.getShardIterator();
@@ -406,16 +413,16 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha
 		ListShardsResult listShardsResults = null;
 
 		// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
-		int attemptCount = 0;
+		int retryCount = 0;
 		// List Shards returns just the first 1000 shard entries. Make sure that all entries
 		// are taken up.
-		while (listShardsResults == null) { // retry until we get a result
+		while (retryCount <= listShardsMaxRetries && listShardsResults == null) { // retry until we get a result
 			try {
 
 				listShardsResults = kinesisClient.listShards(listShardsRequest);
 			} catch (LimitExceededException le) {
 				long backoffMillis = fullJitterBackoff(
-						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
 					LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
 									+ ". Backing off for " + backoffMillis + " millis.");
 				Thread.sleep(backoffMillis);
@@ -433,6 +440,18 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha
 			} catch (ExpiredNextTokenException expiredToken) {
 				LOG.warn("List Shards has an expired token. Reusing the previous state.");
 				break;
+			} catch (SdkClientException ex) {
+				if (retryCount < listShardsMaxRetries && isRecoverableSdkClientException(ex)) {
+					long backoffMillis = fullJitterBackoff(
+						listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
+					LOG.warn("Got SdkClientException when listing shards from stream {}. Backing off for {} millis.",
+						streamName, backoffMillis);
+					Thread.sleep(backoffMillis);
+				} else {
+					// propagate if retries exceeded or not recoverable
+					// (otherwise would return null result and keep trying forever)
+					throw ex;
+				}
 			}
 		}
 		// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index 775ae4b3352..edf6ceb0d57 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -27,16 +27,24 @@
 import com.amazonaws.AmazonServiceException.ErrorType;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.SdkClientException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.AmazonKinesisException;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.ListShardsRequest;
 import com.amazonaws.services.kinesis.model.ListShardsResult;
 import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.powermock.reflect.Whitebox;
 
 import java.util.ArrayList;
@@ -54,6 +62,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -91,6 +100,37 @@ public void testIsRecoverableExceptionWithNullErrorType() {
 		assertFalse(KinesisProxy.isRecoverableException(ex));
 	}
 
+	@Test
+	public void testGetRecordsRetry() throws Exception {
+		Properties kinesisConsumerConfig = new Properties();
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+
+		final GetRecordsResult expectedResult = new GetRecordsResult();
+		MutableInt retries = new MutableInt();
+		final Throwable[] retriableExceptions = new Throwable[] {
+			new AmazonKinesisException("mock"),
+		};
+
+		AmazonKinesisClient mockClient = mock(AmazonKinesisClient.class);
+		Mockito.when(mockClient.getRecords(any())).thenAnswer(new Answer<GetRecordsResult>() {
+			@Override
+			public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable{
+				if (retries.intValue() < retriableExceptions.length) {
+					retries.increment();
+					throw retriableExceptions[retries.intValue() - 1];
+				}
+				return expectedResult;
+			}
+		});
+
+		KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
+		Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient);
+
+		GetRecordsResult result = kinesisProxy.getRecords("fakeShardIterator", 1);
+		assertEquals(retriableExceptions.length, retries.intValue());
+		assertEquals(expectedResult, result);
+	}
+
 	@Test
 	public void testGetShardList() throws Exception {
 		List<String> shardIds =
@@ -151,6 +191,60 @@ public void testGetShardList() throws Exception {
 						expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()])));
 	}
 
+	@Test
+	public void testGetShardListRetry() throws Exception {
+		Properties kinesisConsumerConfig = new Properties();
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+
+		Shard shard = new Shard();
+		shard.setShardId("fake-shard-000000000000");
+		final ListShardsResult expectedResult = new ListShardsResult();
+		expectedResult.withShards(shard);
+
+		MutableInt exceptionCount = new MutableInt();
+		final Throwable[] retriableExceptions = new Throwable[]{
+			new AmazonKinesisException("attempt1"),
+			new AmazonKinesisException("attempt2"),
+		};
+
+		AmazonKinesisClient mockClient = mock(AmazonKinesisClient.class);
+		Mockito.when(mockClient.listShards(any())).thenAnswer(new Answer<ListShardsResult>() {
+
+			@Override
+			public ListShardsResult answer(InvocationOnMock invocation) throws Throwable {
+				if (exceptionCount.intValue() < retriableExceptions.length) {
+					exceptionCount.increment();
+					throw retriableExceptions[exceptionCount.intValue() - 1];
+				}
+				return expectedResult;
+			}
+		});
+
+		KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
+		Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient);
+
+		HashMap<String, String> streamNames = new HashMap();
+		streamNames.put("fake-stream", null);
+		GetShardListResult result = kinesisProxy.getShardList(streamNames);
+		assertEquals(retriableExceptions.length, exceptionCount.intValue());
+		assertEquals(true, result.hasRetrievedShards());
+		assertEquals(shard.getShardId(), result.getLastSeenShardOfStream("fake-stream").getShard().getShardId());
+
+		// test max attempt count exceeded
+		int maxRetries = 1;
+		exceptionCount.setValue(0);
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_RETRIES, String.valueOf(maxRetries));
+		kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
+		Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient);
+		try {
+			kinesisProxy.getShardList(streamNames);
+			Assert.fail("exception expected");
+		} catch (SdkClientException ex) {
+			assertEquals(retriableExceptions[maxRetries], ex);
+		}
+		assertEquals(maxRetries + 1, exceptionCount.intValue());
+	}
+
 	@Test
 	public void testCustomConfigurationOverride() {
 		Properties configProps = new Properties();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services