You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/04/18 12:10:24 UTC

[2/2] flink git commit: [FLINK-9124] [kinesis] Allow customization of KinesisProxy.getRecords read timeout and retry.

[FLINK-9124] [kinesis] Allow customization of KinesisProxy.getRecords read timeout and retry.

This closes #5803.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fec5cae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fec5cae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fec5cae

Branch: refs/heads/master
Commit: 9fec5cae65a7cddea5dff47e5cade50493f6cbdf
Parents: d5ec911
Author: Thomas Weise <th...@apache.org>
Authored: Mon Apr 2 20:49:50 2018 -0700
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Apr 18 20:09:34 2018 +0800

----------------------------------------------------------------------
 .../connectors/kinesis/proxy/KinesisProxy.java  | 40 +++++++++++++++++---
 .../connectors/kinesis/util/AWSUtil.java        | 17 +++++++--
 .../kinesis/proxy/KinesisProxyTest.java         | 27 +++++++++++++
 3 files changed, 74 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9fec5cae/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 057e18d..3486206 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -23,6 +23,9 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 
 import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.SdkClientException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
@@ -125,7 +128,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 	protected KinesisProxy(Properties configProps) {
 		checkNotNull(configProps);
 
-		this.kinesisClient = AWSUtil.createKinesisClient(configProps);
+		this.kinesisClient = createKinesisClient(configProps);
 
 		this.describeStreamBaseBackoffMillis = Long.valueOf(
 			configProps.getProperty(
@@ -177,6 +180,16 @@ public class KinesisProxy implements KinesisProxyInterface {
 	}
 
 	/**
+	 * Create the Kinesis client, using the provided configuration properties and default {@link ClientConfiguration}.
+	 * Derived classes can override this method to customize the client configuration.
+	 * @param configProps
+	 * @return
+	 */
+	protected AmazonKinesis createKinesisClient(Properties configProps) {
+		return AWSUtil.createKinesisClient(configProps, new ClientConfigurationFactory().getConfig());
+	}
+
+	/**
 	 * Creates a Kinesis proxy.
 	 *
 	 * @param configProps configuration properties
@@ -201,12 +214,12 @@ public class KinesisProxy implements KinesisProxyInterface {
 		while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
 			try {
 				getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
-			} catch (AmazonServiceException ex) {
-				if (isRecoverableException(ex)) {
+			} catch (SdkClientException ex) {
+				if (isRecoverableSdkClientException(ex)) {
 					long backoffMillis = fullJitterBackoff(
 						getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
-					LOG.warn("Got recoverable AmazonServiceException. Backing off for "
-						+ backoffMillis + " millis (" + ex.getErrorMessage() + ")");
+					LOG.warn("Got recoverable SdkClientException. Backing off for "
+						+ backoffMillis + " millis (" + ex.getMessage() + ")");
 					Thread.sleep(backoffMillis);
 				} else {
 					throw ex;
@@ -303,6 +316,21 @@ public class KinesisProxy implements KinesisProxyInterface {
 	 * @return <code>true</code> if the exception can be recovered from, else
 	 *         <code>false</code>
 	 */
+	protected boolean isRecoverableSdkClientException(SdkClientException ex) {
+		if (ex instanceof AmazonServiceException) {
+			return KinesisProxy.isRecoverableException((AmazonServiceException) ex);
+		}
+		// customizations may decide to retry other errors, such as read timeouts
+		return false;
+	}
+
+	/**
+	 * Determines whether the exception is recoverable using exponential-backoff.
+	 *
+	 * @param ex Exception to inspect
+	 * @return <code>true</code> if the exception can be recovered from, else
+	 *         <code>false</code>
+	 */
 	protected static boolean isRecoverableException(AmazonServiceException ex) {
 		if (ex.getErrorType() == null) {
 			return false;
@@ -397,7 +425,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 		return describeStreamResult;
 	}
 
-	private static long fullJitterBackoff(long base, long max, double power, int attempt) {
+	protected static long fullJitterBackoff(long base, long max, double power, int attempt) {
 		long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt));
 		return (long) (seed.nextDouble() * exponentialBackoff); // random jitter between 0 and the exponential backoff
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9fec5cae/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 15e6cce..2e9090e 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -52,11 +52,20 @@ public class AWSUtil {
 	 * @return a new AmazonKinesis client
 	 */
 	public static AmazonKinesis createKinesisClient(Properties configProps) {
+		return createKinesisClient(configProps, new ClientConfigurationFactory().getConfig());
+	}
+
+	/**
+	 * Creates an Amazon Kinesis Client.
+	 * @param configProps configuration properties containing the access key, secret key, and region
+	 * @param awsClientConfig preconfigured AWS SDK client configuration
+	 * @return a new Amazon Kinesis Client
+	 */
+	public static AmazonKinesis createKinesisClient(Properties configProps, ClientConfiguration awsClientConfig) {
 		// set a Flink-specific user agent
-		ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig()
-				.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
-														EnvironmentInformation.getVersion(),
-														EnvironmentInformation.getRevisionInformation().commitId));
+		awsClientConfig.setUserAgentPrefix(String.format(USER_AGENT_FORMAT,
+				EnvironmentInformation.getVersion(),
+				EnvironmentInformation.getRevisionInformation().commitId));
 
 		// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
 		AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()

http://git-wip-us.apache.org/repos/asf/flink/blob/9fec5cae/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index 7ca05d7..c84d89b 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -17,12 +17,22 @@
 
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import org.junit.Test;
+import org.powermock.reflect.Whitebox;
+
+import java.util.Properties;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -59,4 +69,21 @@ public class KinesisProxyTest {
 		assertFalse(KinesisProxy.isRecoverableException(ex));
 	}
 
+	@Test
+	public void testCustomConfigurationOverride() {
+		Properties configProps = new Properties();
+		configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		KinesisProxy proxy = new KinesisProxy(configProps) {
+			@Override
+			protected AmazonKinesis createKinesisClient(Properties configProps) {
+				ClientConfiguration clientConfig = new ClientConfigurationFactory().getConfig();
+				clientConfig.setSocketTimeout(10000);
+				return AWSUtil.createKinesisClient(configProps, clientConfig);
+			}
+		};
+		AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient");
+		ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, "clientConfiguration");
+		assertEquals(10000, clientConfiguration.getSocketTimeout());
+	}
+
 }