You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/01/20 15:49:31 UTC

flink git commit: [FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

Repository: flink
Updated Branches:
  refs/heads/release-1.2 be09143cd -> 3b5882afa


[FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

This closes #3078.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b5882af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b5882af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b5882af

Branch: refs/heads/release-1.2
Commit: 3b5882afa0c5e60cfb39c2b098fcae2b112a5990
Parents: be09143c
Author: Scott Kidder <sc...@mux.com>
Authored: Fri Dec 16 08:46:54 2016 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 20 16:48:53 2017 +0100

----------------------------------------------------------------------
 .../connectors/kinesis/proxy/KinesisProxy.java  | 58 ++++++++++++++----
 .../kinesis/proxy/KinesisProxyTest.java         | 63 ++++++++++++++++++++
 2 files changed, 108 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b5882af/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 9ffc8e6..0b0fccf 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -17,14 +17,15 @@
 
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
+import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
-import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.StreamStatus;
 import com.amazonaws.services.kinesis.model.Shard;
@@ -193,12 +194,16 @@ public class KinesisProxy implements KinesisProxyInterface {
 		while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
 			try {
 				getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
-			} catch (ProvisionedThroughputExceededException ex) {
-				long backoffMillis = fullJitterBackoff(
-					getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
-				LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
-					+ backoffMillis + " millis.");
-				Thread.sleep(backoffMillis);
+			} catch (AmazonServiceException ex) {
+				if (isRecoverableException(ex)) {
+					long backoffMillis = fullJitterBackoff(
+						getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
+					LOG.warn("Got recoverable AmazonServiceException. Backing off for "
+						+ backoffMillis + " millis (" + ex.getErrorMessage() + ")");
+					Thread.sleep(backoffMillis);
+				} else {
+					throw ex;
+				}
 			}
 		}
 
@@ -237,12 +242,16 @@ public class KinesisProxy implements KinesisProxyInterface {
 			try {
 				getShardIteratorResult =
 					kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
-			} catch (ProvisionedThroughputExceededException ex) {
-				long backoffMillis = fullJitterBackoff(
-					getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
-				LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
-					+ backoffMillis + " millis.");
-				Thread.sleep(backoffMillis);
+			} catch (AmazonServiceException ex) {
+				if (isRecoverableException(ex)) {
+					long backoffMillis = fullJitterBackoff(
+						getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
+					LOG.warn("Got recoverable AmazonServiceException. Backing off for "
+						+ backoffMillis + " millis (" + ex.getErrorMessage() + ")");
+					Thread.sleep(backoffMillis);
+				} else {
+					throw ex;
+				}
 			}
 		}
 
@@ -253,6 +262,29 @@ public class KinesisProxy implements KinesisProxyInterface {
 		return getShardIteratorResult.getShardIterator();
 	}
 
+	/**
+	 * Determines whether the exception is recoverable using exponential-backoff.
+	 * 
+	 * @param ex Exception to inspect
+	 * @return <code>true</code> if the exception can be recovered from, else
+	 *         <code>false</code>
+	 */
+	protected static boolean isRecoverableException(AmazonServiceException ex) {
+		if (ex.getErrorType() == null) {
+			return false;
+		}
+
+		switch (ex.getErrorType()) {
+			case Client:
+				return ex instanceof ProvisionedThroughputExceededException;
+			case Service:
+			case Unknown:
+				return true;
+			default:
+				return false;
+		}
+	}
+
 	private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
 		List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b5882af/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
new file mode 100644
index 0000000..86202c5
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import org.junit.Test;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+
+/**
+ * Test for methods in the {@link KinesisProxy} class.
+ */
+public class KinesisProxyTest {
+
+	@Test
+	public void testIsRecoverableExceptionWithProvisionedThroughputExceeded() {
+		final ProvisionedThroughputExceededException ex = new ProvisionedThroughputExceededException("asdf");
+		ex.setErrorType(ErrorType.Client);
+		assertTrue(KinesisProxy.isRecoverableException(ex));
+	}
+
+	@Test
+	public void testIsRecoverableExceptionWithServiceException() {
+		final AmazonServiceException ex = new AmazonServiceException("asdf");
+		ex.setErrorType(ErrorType.Service);
+		assertTrue(KinesisProxy.isRecoverableException(ex));
+	}
+
+	@Test
+	public void testIsRecoverableExceptionWithExpiredIteratorException() {
+		final ExpiredIteratorException ex = new ExpiredIteratorException("asdf");
+		ex.setErrorType(ErrorType.Client);
+		assertFalse(KinesisProxy.isRecoverableException(ex));
+	}
+
+	@Test
+	public void testIsRecoverableExceptionWithNullErrorType() {
+		final AmazonServiceException ex = new AmazonServiceException("asdf");
+		ex.setErrorType(null);
+		assertFalse(KinesisProxy.isRecoverableException(ex));
+	}
+
+}