You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/01/20 15:49:31 UTC
flink git commit: [FLINK-5355] [kinesis] Handle
AmazonKinesisException gracefully in Kinesis Streaming Connector
Repository: flink
Updated Branches:
refs/heads/release-1.2 be09143cd -> 3b5882afa
[FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
This closes #3078.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b5882af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b5882af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b5882af
Branch: refs/heads/release-1.2
Commit: 3b5882afa0c5e60cfb39c2b098fcae2b112a5990
Parents: be09143c
Author: Scott Kidder <sc...@mux.com>
Authored: Fri Dec 16 08:46:54 2016 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 20 16:48:53 2017 +0100
----------------------------------------------------------------------
.../connectors/kinesis/proxy/KinesisProxy.java | 58 ++++++++++++++----
.../kinesis/proxy/KinesisProxyTest.java | 63 ++++++++++++++++++++
2 files changed, 108 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5882af/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 9ffc8e6..0b0fccf 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -17,14 +17,15 @@
package org.apache.flink.streaming.connectors.kinesis.proxy;
+import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
-import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.StreamStatus;
import com.amazonaws.services.kinesis.model.Shard;
@@ -193,12 +194,16 @@ public class KinesisProxy implements KinesisProxyInterface {
while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
try {
getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
- } catch (ProvisionedThroughputExceededException ex) {
- long backoffMillis = fullJitterBackoff(
- getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
- LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
- + backoffMillis + " millis.");
- Thread.sleep(backoffMillis);
+ } catch (AmazonServiceException ex) {
+ if (isRecoverableException(ex)) {
+ long backoffMillis = fullJitterBackoff(
+ getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
+ LOG.warn("Got recoverable AmazonServiceException. Backing off for "
+ + backoffMillis + " millis (" + ex.getErrorMessage() + ")");
+ Thread.sleep(backoffMillis);
+ } else {
+ throw ex;
+ }
}
}
@@ -237,12 +242,16 @@ public class KinesisProxy implements KinesisProxyInterface {
try {
getShardIteratorResult =
kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
- } catch (ProvisionedThroughputExceededException ex) {
- long backoffMillis = fullJitterBackoff(
- getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
- LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
- + backoffMillis + " millis.");
- Thread.sleep(backoffMillis);
+ } catch (AmazonServiceException ex) {
+ if (isRecoverableException(ex)) {
+ long backoffMillis = fullJitterBackoff(
+ getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
+ LOG.warn("Got recoverable AmazonServiceException. Backing off for "
+ + backoffMillis + " millis (" + ex.getErrorMessage() + ")");
+ Thread.sleep(backoffMillis);
+ } else {
+ throw ex;
+ }
}
}
@@ -253,6 +262,29 @@ public class KinesisProxy implements KinesisProxyInterface {
return getShardIteratorResult.getShardIterator();
}
+ /**
+ * Determines whether the exception is recoverable using exponential-backoff.
+ *
+ * @param ex Exception to inspect
+ * @return <code>true</code> if the exception can be recovered from, else
+ * <code>false</code>
+ */
+ protected static boolean isRecoverableException(AmazonServiceException ex) {
+ if (ex.getErrorType() == null) {
+ return false;
+ }
+
+ switch (ex.getErrorType()) {
+ case Client:
+ return ex instanceof ProvisionedThroughputExceededException;
+ case Service:
+ case Unknown:
+ return true;
+ default:
+ return false;
+ }
+ }
+
private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5882af/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
new file mode 100644
index 0000000..86202c5
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import org.junit.Test;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+
+/**
+ * Test for methods in the {@link KinesisProxy} class.
+ */
+public class KinesisProxyTest {
+
+ @Test
+ public void testIsRecoverableExceptionWithProvisionedThroughputExceeded() {
+ final ProvisionedThroughputExceededException ex = new ProvisionedThroughputExceededException("asdf");
+ ex.setErrorType(ErrorType.Client);
+ assertTrue(KinesisProxy.isRecoverableException(ex));
+ }
+
+ @Test
+ public void testIsRecoverableExceptionWithServiceException() {
+ final AmazonServiceException ex = new AmazonServiceException("asdf");
+ ex.setErrorType(ErrorType.Service);
+ assertTrue(KinesisProxy.isRecoverableException(ex));
+ }
+
+ @Test
+ public void testIsRecoverableExceptionWithExpiredIteratorException() {
+ final ExpiredIteratorException ex = new ExpiredIteratorException("asdf");
+ ex.setErrorType(ErrorType.Client);
+ assertFalse(KinesisProxy.isRecoverableException(ex));
+ }
+
+ @Test
+ public void testIsRecoverableExceptionWithNullErrorType() {
+ final AmazonServiceException ex = new AmazonServiceException("asdf");
+ ex.setErrorType(null);
+ assertFalse(KinesisProxy.isRecoverableException(ex));
+ }
+
+}