You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2016/11/24 10:52:31 UTC
flink git commit: [FLINK-5075] [kinesis] Make connector fail-proof to
incorrect Kinesalite API behaviour
Repository: flink
Updated Branches:
refs/heads/master 2029c14eb -> f5f4f7a27
[FLINK-5075] [kinesis] Make connector fail-proof to incorrect Kinesalite API behaviour
This closes #2822.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5f4f7a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5f4f7a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5f4f7a2
Branch: refs/heads/master
Commit: f5f4f7a27a7beca23915e8c6030c76d820fa0dbf
Parents: 2029c14
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Nov 17 14:24:24 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 24 18:51:18 2016 +0800
----------------------------------------------------------------------
.../connectors/kinesis/proxy/KinesisProxy.java | 22 ++++++++++++++++----
1 file changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f5f4f7a2/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 1113fde..9ffc8e6 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -34,7 +34,9 @@ import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Map;
@@ -212,7 +214,7 @@ public class KinesisProxy implements KinesisProxyInterface {
* {@inheritDoc}
*/
@Override
- public GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException {
+ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
GetShardListResult result = new GetShardListResult();
for (Map.Entry<String,String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) {
@@ -227,7 +229,7 @@ public class KinesisProxy implements KinesisProxyInterface {
* {@inheritDoc}
*/
@Override
- public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException {
+ public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException {
GetShardIteratorResult getShardIteratorResult = null;
int attempt = 0;
@@ -251,7 +253,7 @@ public class KinesisProxy implements KinesisProxyInterface {
return getShardIteratorResult.getShardIterator();
}
- private List<KinesisStreamShard> getShardsOfStream(String streamName, String lastSeenShardId) throws InterruptedException {
+ private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
DescribeStreamResult describeStreamResult;
@@ -283,7 +285,7 @@ public class KinesisProxy implements KinesisProxyInterface {
* @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
* @return the result of the describe stream operation
*/
- private DescribeStreamResult describeStream(String streamName, String startShardId) throws InterruptedException {
+ private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(streamName);
describeStreamRequest.setExclusiveStartShardId(startShardId);
@@ -314,6 +316,18 @@ public class KinesisProxy implements KinesisProxyInterface {
}
}
+ // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
+ // start shard id in the returned shards list; check if we need to remove these erroneously returned shards
+ if (startShardId != null) {
+ List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
+ Iterator<Shard> shardItr = shards.iterator();
+ while (shardItr.hasNext()) {
+ if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
+ shardItr.remove();
+ }
+ }
+ }
+
return describeStreamResult;
}