You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2016/11/24 10:52:31 UTC

flink git commit: [FLINK-5075] [kinesis] Make connector fail-proof to incorrect Kinesalite API behaviour

Repository: flink
Updated Branches:
  refs/heads/master 2029c14eb -> f5f4f7a27


[FLINK-5075] [kinesis] Make connector fail-proof to incorrect Kinesalite API behaviour

This closes #2822.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5f4f7a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5f4f7a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5f4f7a2

Branch: refs/heads/master
Commit: f5f4f7a27a7beca23915e8c6030c76d820fa0dbf
Parents: 2029c14
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Nov 17 14:24:24 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 24 18:51:18 2016 +0800

----------------------------------------------------------------------
 .../connectors/kinesis/proxy/KinesisProxy.java  | 22 ++++++++++++++++----
 1 file changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f5f4f7a2/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 1113fde..9ffc8e6 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -34,7 +34,9 @@ import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.Map;
@@ -212,7 +214,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 	 * {@inheritDoc}
 	 */
 	@Override
-	public GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException {
+	public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
 		GetShardListResult result = new GetShardListResult();
 
 		for (Map.Entry<String,String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) {
@@ -227,7 +229,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 	 * {@inheritDoc}
 	 */
 	@Override
-	public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException {
+	public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException {
 		GetShardIteratorResult getShardIteratorResult = null;
 
 		int attempt = 0;
@@ -251,7 +253,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 		return getShardIteratorResult.getShardIterator();
 	}
 
-	private List<KinesisStreamShard> getShardsOfStream(String streamName, String lastSeenShardId) throws InterruptedException {
+	private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
 		List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
 
 		DescribeStreamResult describeStreamResult;
@@ -283,7 +285,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 	 * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
 	 * @return the result of the describe stream operation
 	 */
-	private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException {
+	private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
 		final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
 		describeStreamRequest.setStreamName(streamName);
 		describeStreamRequest.setExclusiveStartShardId(startShardId);
@@ -314,6 +316,18 @@ public class KinesisProxy implements KinesisProxyInterface {
 			}
 		}
 
+		// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
+		// start shard id in the returned shards list; check if we need to remove these erroneously returned shards
+		if (startShardId != null) {
+			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
+			Iterator<Shard> shardItr = shards.iterator();
+			while (shardItr.hasNext()) {
+				if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
+					shardItr.remove();
+				}
+			}
+		}
+
 		return describeStreamResult;
 	}