You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/22 16:19:58 UTC

[GitHub] tweise closed pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink

tweise closed pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 135f1dddcc9..6c4567751d3 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -36,6 +36,7 @@ under the License.
 		<aws.sdk.version>1.11.319</aws.sdk.version>
 		<aws.kinesis-kcl.version>1.9.0</aws.kinesis-kcl.version>
 		<aws.kinesis-kpl.version>0.12.9</aws.kinesis-kpl.version>
+		<aws.dynamodbstreams-kinesis-adapter.version>1.4.0</aws.dynamodbstreams-kinesis-adapter.version>
 	</properties>
 
 	<packaging>jar</packaging>
@@ -119,12 +120,24 @@ under the License.
 			<exclusions>
 				<exclusion>
 					<groupId>com.amazonaws</groupId>
-					<artifactId>aws-java-sdk-dynamodb</artifactId>
+					<artifactId>aws-java-sdk-cloudwatch</artifactId>
 				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>com.amazonaws</groupId>
+			<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
+			<version>${aws.dynamodbstreams-kinesis-adapter.version}</version>
+			<exclusions>
 				<exclusion>
 					<groupId>com.amazonaws</groupId>
 					<artifactId>aws-java-sdk-cloudwatch</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>com.fasterxml.jackson.core</groupId>
+					<artifactId>jackson-databind</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkDynamoDBStreamsConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkDynamoDBStreamsConsumer.java
new file mode 100644
index 00000000000..fd920c4b13a
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkDynamoDBStreamsConsumer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.internals.DynamoDBStreamsDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Consume events from DynamoDB streams.
+ *
+ * @param <T> the type of data emitted
+ */
+public class FlinkDynamoDBStreamsConsumer<T> extends FlinkKinesisConsumer<T> {
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkDynamoDBStreamsConsumer.class);
+
+	/**
+	 * Constructor of FlinkDynamoDBStreamsConsumer.
+	 *
+	 * @param stream stream to consume
+	 * @param deserializer deserialization schema
+	 * @param config config properties
+	 */
+	public FlinkDynamoDBStreamsConsumer(
+			String stream,
+			DeserializationSchema<T> deserializer,
+			Properties config) {
+		super(stream, deserializer, config);
+	}
+
+	/**
+	 * Constructor of FlinkDynamodbStreamConsumer.
+	 *
+	 * @param streams list of streams to consume
+	 * @param deserializer  deserialization schema
+	 * @param config config properties
+	 */
+	public FlinkDynamoDBStreamsConsumer(
+			List<String> streams,
+			KinesisDeserializationSchema deserializer,
+			Properties config) {
+		super(streams, deserializer, config);
+	}
+
+	@Override
+	protected KinesisDataFetcher<T> createFetcher(
+			List<String> streams,
+			SourceFunction.SourceContext<T> sourceContext,
+			RuntimeContext runtimeContext,
+			Properties configProps,
+			KinesisDeserializationSchema<T> deserializationSchema) {
+		return new DynamoDBStreamsDataFetcher<T>(
+				streams,
+				sourceContext,
+				runtimeContext,
+				configProps,
+				deserializationSchema,
+				getShardAssigner());
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 42e2173474b..41ac6b877a9 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -65,31 +65,13 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 	/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
 	public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
 
-	/**
-	 * Deprecated key.
-	 *
-	 * @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_BASE} instead
-	 **/
-	@Deprecated
-	/** The base backoff time between each describeStream attempt. */
+	/** The base backoff time between each describeStream attempt (for consuming from DynamoDB streams). */
 	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
 
-	/**
-	 * Deprecated key.
-	 *
-	 * @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_MAX} instead
-	 **/
-	@Deprecated
-	/** The maximum backoff time between each describeStream attempt. */
+	/** The maximum backoff time between each describeStream attempt (for consuming from DynamoDB streams). */
 	public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";
 
-	/**
-	 * Deprecated key.
-	 *
-	 * @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT} instead
-	 **/
-	@Deprecated
-	/** The power constant for exponential backoff between each describeStream attempt. */
+	/** The power constant for exponential backoff between each describeStream attempt (for consuming from DynamoDB streams). */
 	public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";
 
 	/** The maximum number of listShards attempts if we get a recoverable exception. */
@@ -151,13 +133,10 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 
 	public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
 
-	@Deprecated
 	public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
 
-	@Deprecated
 	public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
 
-	@Deprecated
 	public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
 	public static final long DEFAULT_LIST_SHARDS_BACKOFF_BASE = 1000L;
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
new file mode 100644
index 00000000000..c2b7be352b1
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/DynamoDBStreamsDataFetcher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
+import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
+import org.apache.flink.streaming.connectors.kinesis.model.DynamoDBStreamsShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.DynamoDBStreamsProxy;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Dynamodb streams data fetcher.
+ * @param <T> type of fetched data.
+ */
+public class DynamoDBStreamsDataFetcher<T> extends KinesisDataFetcher<T> {
+	private boolean shardIdFormatCheck = false;
+
+	/**
+	 * Constructor.
+	 *
+	 * @param streams list of streams to fetch data
+	 * @param sourceContext source context
+	 * @param runtimeContext runtime context
+	 * @param configProps config properties
+	 * @param deserializationSchema deserialization schema
+	 * @param shardAssigner shard assigner
+	 */
+	public DynamoDBStreamsDataFetcher(List<String> streams,
+		SourceFunction.SourceContext<T> sourceContext,
+		RuntimeContext runtimeContext,
+		Properties configProps,
+		KinesisDeserializationSchema<T> deserializationSchema,
+		KinesisShardAssigner shardAssigner) {
+
+		super(streams,
+			sourceContext,
+			sourceContext.getCheckpointLock(),
+			runtimeContext,
+			configProps,
+			deserializationSchema,
+			shardAssigner,
+			null,
+			new AtomicReference<>(),
+			new ArrayList<>(),
+			createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
+			// use DynamoDBStreamsProxy
+			DynamoDBStreamsProxy::create);
+	}
+
+	@Override
+	protected boolean shouldAdvanceLastDiscoveredShardId(String shardId, String lastSeenShardIdOfStream) {
+		if (DynamoDBStreamsShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) <= 0) {
+			// shardID update is valid only if the given shard id is greater
+			// than the previous last seen shard id of the stream.
+			return false;
+		}
+
+		return true;
+	}
+
+	/**
+	 * Create a new DynamoDB streams shard consumer.
+	 *
+	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
+	 * @param handle stream handle
+	 * @param lastSeqNum last sequence number
+	 * @param shardMetricsReporter the reporter to report metrics to
+	 * @return
+	 */
+	@Override
+	protected ShardConsumer createShardConsumer(
+		Integer subscribedShardStateIndex,
+		StreamShardHandle handle,
+		SequenceNumber lastSeqNum,
+		ShardMetricsReporter shardMetricsReporter) {
+
+		return new ShardConsumer(
+			this,
+			subscribedShardStateIndex,
+			handle,
+			lastSeqNum,
+			DynamoDBStreamsProxy.create(getConsumerConfiguration()),
+			shardMetricsReporter);
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 77ca23c9d37..9cebc41ad26 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -518,11 +518,16 @@ public void advanceLastDiscoveredShardOfStream(String stream, String shardId) {
 		if (lastSeenShardIdOfStream == null) {
 			// if not previously set, simply put as the last seen shard id
 			this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
-		} else if (StreamShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) {
+		} else if (shouldAdvanceLastDiscoveredShardId(shardId, lastSeenShardIdOfStream)) {
 			this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
 		}
 	}
 
+	/** Given lastSeenShardId, check if last discovered shardId should be advanced. */
+	protected boolean shouldAdvanceLastDiscoveredShardId(String shardId, String lastSeenShardIdOfStream) {
+		return (StreamShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) > 0);
+	}
+
 	/**
 	 * A utility function that does the following:
 	 *
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/DynamoDBStreamsShardHandle.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/DynamoDBStreamsShardHandle.java
new file mode 100644
index 00000000000..534184f4ad8
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/DynamoDBStreamsShardHandle.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import com.amazonaws.services.kinesis.model.Shard;
+
+/**
+ * DynamoDB streams shard handle format and utilities.
+ */
+public class DynamoDBStreamsShardHandle extends StreamShardHandle{
+	public static final String SHARDID_PREFIX = "shardId-";
+	public static final int SHARDID_PREFIX_LEN = SHARDID_PREFIX.length();
+
+	public DynamoDBStreamsShardHandle(String streamName, Shard shard) {
+		super(streamName, shard);
+	}
+
+	public static int compareShardIds(String firstShardId, String secondShardId) {
+		if (!isValidShardId(firstShardId)) {
+			throw new IllegalArgumentException(
+				String.format("The first shard id %s has invalid format.", firstShardId));
+		} else if (!isValidShardId(secondShardId)) {
+			throw new IllegalArgumentException(
+				String.format("The second shard id %s has invalid format.", secondShardId));
+		}
+
+		return firstShardId.substring(SHARDID_PREFIX_LEN).compareTo(
+			secondShardId.substring(SHARDID_PREFIX_LEN));
+	}
+
+	/**
+	 * <p>
+	 * Dynamodb streams shard ID is a char string ranging from 28 characters to 65 characters.
+	 * (See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_Shard.html)
+	 *
+	 * The shardId observed usually takes the format of: "shardId-00000001536805703746-69688cb1",
+	 * where "shardId-" is a prefix, followed by a 20-digit timestamp string and 0-36 or more
+	 * characters, separated by '-'. Following this format, it is expected the child shards created
+	 * during a re-sharding event have shardIds bigger than their parents.
+	 * </p>
+	 * @param shardId shard Id
+	 * @return boolean indicate if the given shard Id is valid
+	 */
+	public static boolean isValidShardId(String shardId) {
+		return shardId == null ? false : shardId.matches("^shardId-\\d{20}-{0,1}\\w{0,36}");
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxy.java
new file mode 100644
index 00000000000..eb5620faf17
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamoDBStreamsProxy.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getCredentialsProvider;
+import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.setAwsClientConfigProperties;
+
+/**
+ * DynamoDB streams proxy: interface interacting with the DynamoDB streams.
+ */
+public class DynamoDBStreamsProxy extends KinesisProxy {
+	private static final Logger LOG = LoggerFactory.getLogger(DynamoDBStreamsProxy.class);
+
+	/** Used for formatting Flink-specific user agent string when creating Kinesis client. */
+	private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) DynamoDB Streams Connector";
+
+	protected DynamoDBStreamsProxy(Properties configProps) {
+		super(configProps);
+	}
+
+	/**
+	 * Creates a DynamoDB streams proxy.
+	 *
+	 * @param configProps configuration properties
+	 * @return the created DynamoDB streams proxy
+	 */
+	public static KinesisProxyInterface create(Properties configProps) {
+		return new DynamoDBStreamsProxy(configProps);
+	}
+
+	/**
+	 * Creates an AmazonDynamoDBStreamsAdapterClient.
+	 * Uses it as the internal client interacting with the DynamoDB streams.
+	 *
+	 * @param configProps configuration properties
+	 * @return an AWS DynamoDB streams adapter client
+	 */
+	@Override
+	protected AmazonKinesis createKinesisClient(Properties configProps) {
+		ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig();
+		setAwsClientConfigProperties(awsClientConfig, configProps);
+
+		AWSCredentialsProvider credentials = getCredentialsProvider(configProps);
+		awsClientConfig.setUserAgentPrefix(
+				String.format(
+						USER_AGENT_FORMAT,
+						EnvironmentInformation.getVersion(),
+						EnvironmentInformation.getRevisionInformation().commitId));
+
+		AmazonDynamoDBStreamsAdapterClient adapterClient =
+				new AmazonDynamoDBStreamsAdapterClient(credentials, awsClientConfig);
+
+		if (configProps.containsKey(AWS_ENDPOINT)) {
+			adapterClient.setEndpoint(configProps.getProperty(AWS_ENDPOINT));
+		} else {
+			adapterClient.setRegion(Region.getRegion(
+					Regions.fromName(configProps.getProperty(AWS_REGION))));
+		}
+
+		return adapterClient;
+	}
+
+	@Override
+	public GetShardListResult getShardList(
+			Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
+		GetShardListResult result = new GetShardListResult();
+
+		for (Map.Entry<String, String> streamNameWithLastSeenShardId :
+				streamNamesWithLastSeenShardIds.entrySet()) {
+			String stream = streamNameWithLastSeenShardId.getKey();
+			String lastSeenShardId = streamNameWithLastSeenShardId.getValue();
+			result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId));
+		}
+		return result;
+	}
+
+	private List<StreamShardHandle> getShardsOfStream(
+			String streamName,
+			@Nullable String lastSeenShardId)
+			throws InterruptedException {
+		List<StreamShardHandle> shardsOfStream = new ArrayList<>();
+
+		DescribeStreamResult describeStreamResult;
+		do {
+			describeStreamResult = describeStream(streamName, lastSeenShardId);
+			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
+			for (Shard shard : shards) {
+				shardsOfStream.add(new StreamShardHandle(streamName, shard));
+			}
+
+			if (shards.size() != 0) {
+				lastSeenShardId = shards.get(shards.size() - 1).getShardId();
+			}
+		} while (describeStreamResult.getStreamDescription().isHasMoreShards());
+
+		return shardsOfStream;
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 262181ae3bc..4e44aebd377 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -28,6 +28,8 @@
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.SdkClientException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.ExpiredNextTokenException;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
@@ -42,6 +44,7 @@
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.StreamStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -126,6 +129,15 @@
 	/** Maximum retry attempts for the get shard iterator operation. */
 	private final int getShardIteratorMaxRetries;
 
+	/* Backoff millis for the describe stream operation. */
+	private final long describeStreamBaseBackoffMillis;
+
+	/* Maximum backoff millis for the describe stream operation. */
+	private final long describeStreamMaxBackoffMillis;
+
+	/* Exponential backoff power constant for the describe stream operation. */
+	private final double describeStreamExpConstant;
+
 	/**
 	 * Create a new KinesisProxy based on the supplied configuration properties.
 	 *
@@ -133,7 +145,7 @@
 	 */
 	protected KinesisProxy(Properties configProps) {
 		checkNotNull(configProps);
-		KinesisConfigUtil.replaceDeprecatedConsumerKeys(configProps);
+		KinesisConfigUtil.backfillConsumerKeys(configProps);
 
 		this.kinesisClient = createKinesisClient(configProps);
 
@@ -153,7 +165,15 @@ protected KinesisProxy(Properties configProps) {
 			configProps.getProperty(
 				ConsumerConfigConstants.LIST_SHARDS_RETRIES,
 				Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES)));
-
+		this.describeStreamBaseBackoffMillis = Long.valueOf(
+				configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
+						Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
+		this.describeStreamMaxBackoffMillis = Long.valueOf(
+				configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
+						Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
+		this.describeStreamExpConstant = Double.valueOf(
+				configProps.getProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
+						Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
 		this.getRecordsBaseBackoffMillis = Long.valueOf(
 			configProps.getProperty(
 				ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
@@ -469,6 +489,61 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha
 		return listShardsResults;
 	}
 
+	/**
+	 * Get metainfo for a Kinesis stream, which contains information about which shards this
+	 * Kinesis stream possess.
+	 *
+	 * <p>This method is using a "full jitter" approach described in AWS's article,
+	 * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">
+	 *   "Exponential Backoff and Jitter"</a>.
+	 * This is necessary because concurrent calls will be made by all parallel subtask's fetcher.
+	 * This jitter backoff approach will help distribute calls across the fetchers over time.
+	 *
+	 * @param streamName the stream to describe
+	 * @param startShardId which shard to start with for this describe operation
+	 *
+	 * @return the result of the describe stream operation
+	 */
+	protected DescribeStreamResult describeStream(String streamName, @Nullable String startShardId)
+			throws InterruptedException {
+		final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
+		describeStreamRequest.setStreamName(streamName);
+		describeStreamRequest.setExclusiveStartShardId(startShardId);
+
+		DescribeStreamResult describeStreamResult = null;
+
+		// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
+		int attemptCount = 0;
+		while (describeStreamResult == null) { // retry until we get a result
+			try {
+				describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
+			} catch (LimitExceededException le) {
+				long backoffMillis = fullJitterBackoff(
+						describeStreamBaseBackoffMillis,
+						describeStreamMaxBackoffMillis,
+						describeStreamExpConstant,
+						attemptCount++);
+				LOG.warn(String.format("Got LimitExceededException when describing stream %s. "
+						+ "Backing off for %d millis.", streamName, backoffMillis));
+				Thread.sleep(backoffMillis);
+			} catch (ResourceNotFoundException re) {
+				throw new RuntimeException("Error while getting stream details", re);
+			}
+		}
+
+		String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
+		if (!(streamStatus.equals(StreamStatus.ACTIVE.toString())
+				|| streamStatus.equals(StreamStatus.UPDATING.toString()))) {
+			if (LOG.isWarnEnabled()) {
+				LOG.warn(String.format("The status of stream %s is %s ; result of the current "
+								+ "describeStream operation will not contain any shard information.",
+						streamName, streamStatus));
+			}
+		}
+
+		return describeStreamResult;
+	}
+
 	protected static long fullJitterBackoff(long base, long max, double power, int attempt) {
 		long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt));
 		return (long) (seed.nextDouble() * exponentialBackoff); // random jitter between 0 and the exponential backoff
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/DynamoDBStreamsSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/DynamoDBStreamsSchema.java
new file mode 100644
index 00000000000..ea6eeee917c
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/DynamoDBStreamsSchema.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import com.amazonaws.services.dynamodbv2.model.Record;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * Schema used for deserializing DynamoDB streams records.
+ */
+public class DynamoDBStreamsSchema implements KinesisDeserializationSchema<Record> {
+	private static final ObjectMapper MAPPER = new ObjectMapper();
+
+	@Override
+	public Record deserialize(byte[] message, String partitionKey, String seqNum,
+			long approxArrivalTimestamp, String stream, String shardId) throws IOException {
+		return MAPPER.readValue(message, Record.class);
+	}
+
+	@Override
+	public TypeInformation<Record> getProducedType() {
+		return TypeInformation.of(Record.class);
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 75c84cdca5a..b277696bf1a 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -193,18 +193,31 @@ public static Properties replaceDeprecatedProducerKeys(Properties configProps) {
 		return configProps;
 	}
 
-	public static Properties replaceDeprecatedConsumerKeys(Properties configProps) {
-		HashMap<String, String> deprecatedOldKeyToNewKeys = new HashMap<>();
-		deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE);
-		deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX);
-		deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT);
-		for (Map.Entry<String, String> entry : deprecatedOldKeyToNewKeys.entrySet()) {
-			String deprecatedOldKey = entry.getKey();
+	/**
+	 * <p>
+	 *  A set of configuration paremeters associated with the describeStreams API may be used if:
+	 * 	1) an legacy client wants to consume from Kinesis
+	 * 	2) a current client wants to consumer from DynamoDB streams
+	 *
+	 * In the context of 1), the set of configurations needs to be translated to the corresponding
+	 * configurations in the Kinesis listShards API. In the mean time, keep these configs since
+	 * they are applicable in the context of 2), i.e., polling data from a DynamoDB stream.
+	 * </p>
+	 *
+	 * @param configProps original config properties.
+	 * @return backfilled config properties.
+	 */
+	public static Properties backfillConsumerKeys(Properties configProps) {
+		HashMap<String, String> oldKeyToNewKeys = new HashMap<>();
+		oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE);
+		oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX);
+		oldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT);
+		for (Map.Entry<String, String> entry : oldKeyToNewKeys.entrySet()) {
+			String oldKey = entry.getKey();
 			String newKey = entry.getValue();
-			if (configProps.containsKey(deprecatedOldKey)) {
-				LOG.warn("Please note {} property has been deprecated. Please use the {} new property key", deprecatedOldKey, newKey);
-				configProps.setProperty(newKey, configProps.getProperty(deprecatedOldKey));
-				configProps.remove(deprecatedOldKey);
+			if (configProps.containsKey(oldKey)) {
+				configProps.setProperty(newKey, configProps.getProperty(oldKey));
+				// Do not remove the oldKey since they may be used in the context of talking to DynamoDB streams
 			}
 		}
 		return configProps;
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java
new file mode 100644
index 00000000000..44b1ddd1f44
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkDynamoDBStreamsConsumer;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import java.util.Properties;
+
+/**
+ * Sample command-line program of consuming data from a single DynamoDB stream.
+ */
+public class ConsumeFromDynamoDBStreams {
+	private static final String DYNAMODB_STREAM_NAME = "stream";
+
+	public static void main(String[] args) throws Exception {
+		ParameterTool pt = ParameterTool.fromArgs(args);
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(1);
+
+		Properties dynamodbStreamsConsumerConfig = new Properties();
+		final String streamName = pt.getRequired(DYNAMODB_STREAM_NAME);
+		dynamodbStreamsConsumerConfig.setProperty(
+				ConsumerConfigConstants.AWS_REGION, pt.getRequired("region"));
+		dynamodbStreamsConsumerConfig.setProperty(
+				ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accesskey"));
+		dynamodbStreamsConsumerConfig.setProperty(
+				ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretkey"));
+
+		DataStream<String> dynamodbStreams = see.addSource(new FlinkDynamoDBStreamsConsumer<>(
+				streamName,
+				new SimpleStringSchema(),
+				dynamodbStreamsConsumerConfig));
+
+		dynamodbStreams.print();
+
+		see.execute();
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/model/DynamoDBStreamsShardHandleTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/model/DynamoDBStreamsShardHandleTest.java
new file mode 100644
index 00000000000..c8e34153156
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/model/DynamoDBStreamsShardHandleTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import org.junit.Test;
+
+import static org.apache.flink.streaming.connectors.kinesis.model.DynamoDBStreamsShardHandle.SHARDID_PREFIX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Shard handle unit tests.
+ */
+public class DynamoDBStreamsShardHandleTest {
+	@Test
+	public void testIsValidShardId() {
+		// normal form
+		String shardId = "shardId-00000001536805703746-69688cb1";
+		assertEquals(true, DynamoDBStreamsShardHandle.isValidShardId(shardId));
+
+		// short form
+		shardId = "shardId-00000001536805703746";
+		assertEquals(true, DynamoDBStreamsShardHandle.isValidShardId(shardId));
+
+		// long form
+		shardId = "shardId-00000001536805703746-69688cb1aljkwerijfl8228sl12a123akfla";
+		assertEquals(true, DynamoDBStreamsShardHandle.isValidShardId(shardId));
+
+		// invalid with wrong prefix
+		shardId = "sId-00000001536805703746-69688cb1";
+		assertEquals(false, DynamoDBStreamsShardHandle.isValidShardId(shardId));
+
+		// invalid with non-digits
+		shardId = "shardId-0000000153680570aabb-69688cb1";
+		assertEquals(false, DynamoDBStreamsShardHandle.isValidShardId(shardId));
+
+		// invalid with shardId too long
+		shardId = "shardId-00000001536805703746-69688cb1aljkwerijfl8228sl12a123akfla0000";
+		assertEquals(false, DynamoDBStreamsShardHandle.isValidShardId(shardId));
+	}
+
+	@Test
+	public void testCompareShardId() {
+		final int numShardIds = 10;
+		final int shardIdDigitLen = 20;
+		final String zeros = "00000000000000000000";  // twenty '0' chars
+		String shardIdValid = "shardId-00000001536805703746-69688cb1";
+		String shardIdInvalid = "shardId-0000000153680570aabb-69688cb1";
+
+		assertEquals(0, DynamoDBStreamsShardHandle.compareShardIds(shardIdValid, shardIdValid));
+
+		// comparison of invalid shardIds should yield exception
+		try {
+			DynamoDBStreamsShardHandle.compareShardIds(shardIdValid, shardIdInvalid);
+			fail("invalid shard Id" + shardIdInvalid + " should trigger exception");
+		} catch (IllegalArgumentException e) {
+			// expected
+		}
+		try {
+			DynamoDBStreamsShardHandle.compareShardIds(shardIdInvalid, shardIdValid);
+			fail("invalid shard Id" + shardIdInvalid + " should trigger exception");
+		} catch (IllegalArgumentException e) {
+			// expected
+		}
+
+		// compare randomly generated shardIds based on timestamp
+		String[] shardIds = new String[numShardIds];
+		for (int i = 0; i < numShardIds; i++) {
+			String nowStr = String.valueOf(System.currentTimeMillis());
+			if (nowStr.length() < shardIdDigitLen) {
+				shardIds[i] = SHARDID_PREFIX + zeros.substring(0, shardIdDigitLen - nowStr.length())
+						+ nowStr;
+			} else {
+				shardIds[i] = SHARDID_PREFIX + nowStr.substring(0, shardIdDigitLen);
+			}
+			try {
+				Thread.sleep(100);
+			} catch (InterruptedException e) {
+				// ignore
+			}
+		}
+		for (int i = 1; i < numShardIds - 1; i++) {
+			assertTrue(DynamoDBStreamsShardHandle.compareShardIds(shardIds[i - 1], shardIds[i]) < 0);
+			assertTrue(DynamoDBStreamsShardHandle.compareShardIds(shardIds[i], shardIds[i]) == 0);
+			assertTrue(DynamoDBStreamsShardHandle.compareShardIds(shardIds[i], shardIds[i + 1]) < 0);
+		}
+	}
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services