You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/26 08:42:06 UTC
[7/8] flink git commit: [FLINK-6653] Avoid directly serializing AWS's
Shard class in Kinesis consumer's checkpoints
[FLINK-6653] Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64ca1aa5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64ca1aa5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64ca1aa5
Branch: refs/heads/release-1.3
Commit: 64ca1aa5909d8d4e60a847679accee8d9aeb24e2
Parents: 4ae040c
Author: Tony Wei <to...@gmail.com>
Authored: Thu May 25 10:39:22 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:33:42 2017 +0800
----------------------------------------------------------------------
.../kinesis/FlinkKinesisConsumer.java | 71 +++++--
.../kinesis/internals/KinesisDataFetcher.java | 92 +++++++--
.../kinesis/internals/ShardConsumer.java | 8 +-
.../kinesis/model/KinesisStreamShard.java | 5 +-
.../kinesis/model/KinesisStreamShardState.java | 21 +-
.../kinesis/model/KinesisStreamShardV2.java | 171 ++++++++++++++++
.../kinesis/model/StreamShardHandle.java | 129 ++++++++++++
.../kinesis/proxy/GetShardListResult.java | 16 +-
.../connectors/kinesis/proxy/KinesisProxy.java | 12 +-
.../kinesis/proxy/KinesisProxyInterface.java | 4 +-
.../FlinkKinesisConsumerMigrationTest.java | 7 +-
.../kinesis/FlinkKinesisConsumerTest.java | 205 ++++++++++++-------
.../internals/KinesisDataFetcherTest.java | 111 +++++++---
.../kinesis/internals/ShardConsumerTest.java | 16 +-
.../testutils/FakeKinesisBehavioursFactory.java | 22 +-
15 files changed, 698 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 4982f7f..b7f5506 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -38,6 +38,8 @@ import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
@@ -98,7 +100,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
private transient KinesisDataFetcher<T> fetcher;
/** The sequence numbers to restore to upon restore from failure */
- private transient HashMap<KinesisStreamShard, SequenceNumber> sequenceNumsToRestore;
+ private transient HashMap<KinesisStreamShardV2, SequenceNumber> sequenceNumsToRestore;
private volatile boolean running = true;
@@ -109,7 +111,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
/** State name to access shard sequence number states; cannot be changed */
private static final String sequenceNumsStateStoreName = "Kinesis-Stream-Shard-State";
- private transient ListState<Tuple2<KinesisStreamShard, SequenceNumber>> sequenceNumsStateForCheckpoint;
+ private transient ListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> sequenceNumsStateForCheckpoint;
// ------------------------------------------------------------------------
// Constructors
@@ -197,25 +199,26 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
KinesisDataFetcher<T> fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);
// initial discovery
- List<KinesisStreamShard> allShards = fetcher.discoverNewShardsToSubscribe();
+ List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();
- for (KinesisStreamShard shard : allShards) {
+ for (StreamShardHandle shard : allShards) {
+ KinesisStreamShardV2 kinesisStreamShard = KinesisDataFetcher.createKinesisStreamShardV2(shard);
if (sequenceNumsToRestore != null) {
- if (sequenceNumsToRestore.containsKey(shard)) {
+ if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
// if the shard was already seen and is contained in the state,
// just use the sequence number stored in the state
fetcher.registerNewSubscribedShardState(
- new KinesisStreamShardState(shard, sequenceNumsToRestore.get(shard)));
+ new KinesisStreamShardState(kinesisStreamShard, shard, sequenceNumsToRestore.get(kinesisStreamShard)));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
" starting state set to the restored sequence number {}",
- getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(shard));
+ getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(kinesisStreamShard));
}
} else {
// the shard wasn't discovered in the previous run, therefore should be consumed from the beginning
fetcher.registerNewSubscribedShardState(
- new KinesisStreamShardState(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
+ new KinesisStreamShardState(kinesisStreamShard, shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}," +
@@ -231,7 +234,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();
fetcher.registerNewSubscribedShardState(
- new KinesisStreamShardState(shard, startingSeqNum.get()));
+ new KinesisStreamShardState(kinesisStreamShard, shard, startingSeqNum.get()));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
@@ -295,8 +298,8 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
- TypeInformation.of(KinesisStreamShard.class),
+ TypeInformation<Tuple2<KinesisStreamShardV2, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
+ TypeInformation.of(KinesisStreamShardV2.class),
TypeInformation.of(SequenceNumber.class));
sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
@@ -305,7 +308,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
if (context.isRestored()) {
if (sequenceNumsToRestore == null) {
sequenceNumsToRestore = new HashMap<>();
- for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
+ for (Tuple2<KinesisStreamShardV2, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
sequenceNumsToRestore.put(kinesisSequenceNumber.f0, kinesisSequenceNumber.f1);
}
@@ -330,12 +333,12 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
if (fetcher == null) {
if (sequenceNumsToRestore != null) {
- for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
+ for (Map.Entry<KinesisStreamShardV2, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
// sequenceNumsToRestore is the restored global union state;
// should only snapshot shards that actually belong to us
if (KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(
- entry.getKey(),
+ KinesisDataFetcher.createStreamShardHandle(entry.getKey()),
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getIndexOfThisSubtask())) {
@@ -344,14 +347,14 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
}
}
} else {
- HashMap<KinesisStreamShard, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();
+ HashMap<KinesisStreamShardV2, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();
if (LOG.isDebugEnabled()) {
LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
}
- for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
+ for (Map.Entry<KinesisStreamShardV2, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
}
}
@@ -363,7 +366,14 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
LOG.info("Subtask {} restoring offsets from an older Flink version: {}",
getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore);
- sequenceNumsToRestore = restoredState.isEmpty() ? null : restoredState;
+ if (restoredState.isEmpty()) {
+ sequenceNumsToRestore = null;
+ } else {
+ sequenceNumsToRestore = new HashMap<>();
+ for (Map.Entry<KinesisStreamShard, SequenceNumber> kv: restoredState.entrySet()) {
+ sequenceNumsToRestore.put(createKinesisStreamShardV2(kv.getKey()), kv.getValue());
+ }
+ }
}
/** This method is exposed for tests that need to mock the KinesisDataFetcher in the consumer. */
@@ -378,7 +388,32 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
}
@VisibleForTesting
- HashMap<KinesisStreamShard, SequenceNumber> getRestoredState() {
+ HashMap<KinesisStreamShardV2, SequenceNumber> getRestoredState() {
return sequenceNumsToRestore;
}
+
+ /**
+ * Utility function to convert {@link KinesisStreamShard} into {@link KinesisStreamShardV2}
+ *
+ * @param kinesisStreamShard the {@link KinesisStreamShard} to be converted
+ * @return a {@link KinesisStreamShardV2} object
+ */
+ public static KinesisStreamShardV2 createKinesisStreamShardV2(KinesisStreamShard kinesisStreamShard) {
+ KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
+
+ kinesisStreamShardV2.setStreamName(kinesisStreamShard.getStreamName());
+ kinesisStreamShardV2.setShardId(kinesisStreamShard.getShard().getShardId());
+ kinesisStreamShardV2.setParentShardId(kinesisStreamShard.getShard().getParentShardId());
+ kinesisStreamShardV2.setAdjacentParentShardId(kinesisStreamShard.getShard().getAdjacentParentShardId());
+ if (kinesisStreamShard.getShard().getHashKeyRange() != null) {
+ kinesisStreamShardV2.setStartingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getStartingHashKey());
+ kinesisStreamShardV2.setEndingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getEndingHashKey());
+ }
+ if (kinesisStreamShard.getShard().getSequenceNumberRange() != null) {
+ kinesisStreamShardV2.setStartingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getStartingSequenceNumber());
+ kinesisStreamShardV2.setEndingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getEndingSequenceNumber());
+ }
+
+ return kinesisStreamShardV2;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 99305cb..b0dceec 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -17,13 +17,17 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -259,7 +263,7 @@ public class KinesisDataFetcher<T> {
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}",
- indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(),
+ indexOfThisConsumerSubtask, seededShardState.getStreamShardHandle().toString(),
seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
}
@@ -267,7 +271,7 @@ public class KinesisDataFetcher<T> {
new ShardConsumer<>(
this,
seededStateIndex,
- subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
+ subscribedShardsState.get(seededStateIndex).getStreamShardHandle(),
subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
}
}
@@ -293,19 +297,19 @@ public class KinesisDataFetcher<T> {
LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...",
indexOfThisConsumerSubtask);
}
- List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe();
+ List<StreamShardHandle> newShardsDueToResharding = discoverNewShardsToSubscribe();
- for (KinesisStreamShard shard : newShardsDueToResharding) {
+ for (StreamShardHandle shard : newShardsDueToResharding) {
// since there may be delay in discovering a new shard, all new shards due to
// resharding should be read starting from the earliest record possible
KinesisStreamShardState newShardState =
- new KinesisStreamShardState(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
+ new KinesisStreamShardState(createKinesisStreamShardV2(shard), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
int newStateIndex = registerNewSubscribedShardState(newShardState);
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming " +
"the shard from sequence number {} with ShardConsumer {}",
- indexOfThisConsumerSubtask, newShardState.getKinesisStreamShard().toString(),
+ indexOfThisConsumerSubtask, newShardState.getStreamShardHandle().toString(),
newShardState.getLastProcessedSequenceNum(), newStateIndex);
}
@@ -313,7 +317,7 @@ public class KinesisDataFetcher<T> {
new ShardConsumer<>(
this,
newStateIndex,
- newShardState.getKinesisStreamShard(),
+ newShardState.getStreamShardHandle(),
newShardState.getLastProcessedSequenceNum()));
}
@@ -349,11 +353,11 @@ public class KinesisDataFetcher<T> {
*
* @return state snapshot
*/
- public HashMap<KinesisStreamShard, SequenceNumber> snapshotState() {
+ public HashMap<KinesisStreamShardV2, SequenceNumber> snapshotState() {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
- HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new HashMap<>();
+ HashMap<KinesisStreamShardV2, SequenceNumber> stateSnapshot = new HashMap<>();
for (KinesisStreamShardState shardWithState : subscribedShardsState) {
stateSnapshot.put(shardWithState.getKinesisStreamShard(), shardWithState.getLastProcessedSequenceNum());
}
@@ -405,7 +409,7 @@ public class KinesisDataFetcher<T> {
if (lastSeenShardIdOfStream == null) {
// if not previously set, simply put as the last seen shard id
this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
- } else if (KinesisStreamShard.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) {
+ } else if (StreamShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) {
this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
}
}
@@ -419,17 +423,17 @@ public class KinesisDataFetcher<T> {
* 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards
* that we have already seen before the next time this function is called
*/
- public List<KinesisStreamShard> discoverNewShardsToSubscribe() throws InterruptedException {
+ public List<StreamShardHandle> discoverNewShardsToSubscribe() throws InterruptedException {
- List<KinesisStreamShard> newShardsToSubscribe = new LinkedList<>();
+ List<StreamShardHandle> newShardsToSubscribe = new LinkedList<>();
GetShardListResult shardListResult = kinesis.getShardList(subscribedStreamsToLastDiscoveredShardIds);
if (shardListResult.hasRetrievedShards()) {
Set<String> streamsWithNewShards = shardListResult.getStreamsWithRetrievedShards();
for (String stream : streamsWithNewShards) {
- List<KinesisStreamShard> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream);
- for (KinesisStreamShard newShard : newShardsOfStream) {
+ List<StreamShardHandle> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream);
+ for (StreamShardHandle newShard : newShardsOfStream) {
if (isThisSubtaskShouldSubscribeTo(newShard, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) {
newShardsToSubscribe.add(newShard);
}
@@ -502,7 +506,7 @@ public class KinesisDataFetcher<T> {
// we've finished reading the shard and should determine it to be non-active
if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
LOG.info("Subtask {} has reached the end of subscribed shard: {}",
- indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
+ indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getStreamShardHandle());
// check if we need to mark the source as idle;
// note that on resharding, if registerNewSubscribedShardState was invoked for newly discovered shards
@@ -549,7 +553,7 @@ public class KinesisDataFetcher<T> {
* @param totalNumberOfConsumerSubtasks total number of consumer subtasks
* @param indexOfThisConsumerSubtask index of this consumer subtask
*/
- public static boolean isThisSubtaskShouldSubscribeTo(KinesisStreamShard shard,
+ public static boolean isThisSubtaskShouldSubscribeTo(StreamShardHandle shard,
int totalNumberOfConsumerSubtasks,
int indexOfThisConsumerSubtask) {
return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask;
@@ -582,4 +586,58 @@ public class KinesisDataFetcher<T> {
}
return initial;
}
+
+ /**
+ * Utility function to convert {@link StreamShardHandle} into {@link KinesisStreamShardV2}
+ *
+ * @param streamShardHandle the {@link StreamShardHandle} to be converted
+ * @return a {@link KinesisStreamShardV2} object
+ */
+ public static KinesisStreamShardV2 createKinesisStreamShardV2(StreamShardHandle streamShardHandle) {
+ KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
+
+ kinesisStreamShardV2.setStreamName(streamShardHandle.getStreamName());
+ kinesisStreamShardV2.setShardId(streamShardHandle.getShard().getShardId());
+ kinesisStreamShardV2.setParentShardId(streamShardHandle.getShard().getParentShardId());
+ kinesisStreamShardV2.setAdjacentParentShardId(streamShardHandle.getShard().getAdjacentParentShardId());
+ if (streamShardHandle.getShard().getHashKeyRange() != null) {
+ kinesisStreamShardV2.setStartingHashKey(streamShardHandle.getShard().getHashKeyRange().getStartingHashKey());
+ kinesisStreamShardV2.setEndingHashKey(streamShardHandle.getShard().getHashKeyRange().getEndingHashKey());
+ }
+ if (streamShardHandle.getShard().getSequenceNumberRange() != null) {
+ kinesisStreamShardV2.setStartingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getStartingSequenceNumber());
+ kinesisStreamShardV2.setEndingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getEndingSequenceNumber());
+ }
+
+ return kinesisStreamShardV2;
+ }
+
+ /**
+ * Utility function to convert {@link KinesisStreamShardV2} into {@link StreamShardHandle}
+ *
+ * @param kinesisStreamShard the {@link KinesisStreamShardV2} to be converted
+ * @return a {@link StreamShardHandle} object
+ */
+ public static StreamShardHandle createStreamShardHandle(KinesisStreamShardV2 kinesisStreamShard) {
+ Shard shard = new Shard();
+ shard.withShardId(kinesisStreamShard.getShardId());
+ shard.withParentShardId(kinesisStreamShard.getParentShardId());
+ shard.withAdjacentParentShardId(kinesisStreamShard.getAdjacentParentShardId());
+
+ if (kinesisStreamShard.getStartingHashKey() != null && kinesisStreamShard.getEndingHashKey() != null) {
+ HashKeyRange hashKeyRange = new HashKeyRange();
+ hashKeyRange.withStartingHashKey(kinesisStreamShard.getStartingHashKey());
+ hashKeyRange.withEndingHashKey(kinesisStreamShard.getEndingHashKey());
+ shard.withHashKeyRange(hashKeyRange);
+ }
+
+ if (kinesisStreamShard.getStartingSequenceNumber() != null && kinesisStreamShard.getEndingSequenceNumber() != null) {
+ SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
+ sequenceNumberRange.withStartingSequenceNumber(kinesisStreamShard.getStartingSequenceNumber());
+ sequenceNumberRange.withEndingSequenceNumber(kinesisStreamShard.getEndingSequenceNumber());
+ shard.withSequenceNumberRange(sequenceNumberRange);
+ }
+
+ return new StreamShardHandle(kinesisStreamShard.getStreamName(), shard);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index ca85854..a724b49 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -24,7 +24,7 @@ import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -60,7 +60,7 @@ public class ShardConsumer<T> implements Runnable {
private final KinesisDataFetcher<T> fetcherRef;
- private final KinesisStreamShard subscribedShard;
+ private final StreamShardHandle subscribedShard;
private final int maxNumberOfRecordsPerFetch;
private final long fetchIntervalMillis;
@@ -79,7 +79,7 @@ public class ShardConsumer<T> implements Runnable {
*/
public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
Integer subscribedShardStateIndex,
- KinesisStreamShard subscribedShard,
+ StreamShardHandle subscribedShard,
SequenceNumber lastSequenceNum) {
this(fetcherRef,
subscribedShardStateIndex,
@@ -91,7 +91,7 @@ public class ShardConsumer<T> implements Runnable {
/** This constructor is exposed for testing purposes */
protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
Integer subscribedShardStateIndex,
- KinesisStreamShard subscribedShard,
+ StreamShardHandle subscribedShard,
SequenceNumber lastSequenceNum,
KinesisProxyInterface kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
index 53ed11b..f3dcfe1 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
@@ -24,9 +24,8 @@ import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
- * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to
- * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges.
+ * A legacy serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
+ * provided along with {@link com.amazonaws.services.kinesis.model.Shard}.
*/
public class KinesisStreamShard implements Serializable {
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
index 00181da..e68129d 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
@@ -18,22 +18,28 @@
package org.apache.flink.streaming.connectors.kinesis.model;
/**
- * A wrapper class that bundles a {@link KinesisStreamShard} with its last processed sequence number.
+ * A wrapper class that bundles a {@link StreamShardHandle} with its last processed sequence number.
*/
public class KinesisStreamShardState {
- private KinesisStreamShard kinesisStreamShard;
+ private KinesisStreamShardV2 kinesisStreamShard;
+ private StreamShardHandle streamShardHandle;
private SequenceNumber lastProcessedSequenceNum;
- public KinesisStreamShardState(KinesisStreamShard kinesisStreamShard, SequenceNumber lastProcessedSequenceNum) {
+ public KinesisStreamShardState(KinesisStreamShardV2 kinesisStreamShard, StreamShardHandle streamShardHandle, SequenceNumber lastProcessedSequenceNum) {
this.kinesisStreamShard = kinesisStreamShard;
+ this.streamShardHandle = streamShardHandle;
this.lastProcessedSequenceNum = lastProcessedSequenceNum;
}
- public KinesisStreamShard getKinesisStreamShard() {
+ public KinesisStreamShardV2 getKinesisStreamShard() {
return this.kinesisStreamShard;
}
+ public StreamShardHandle getStreamShardHandle() {
+ return this.streamShardHandle;
+ }
+
public SequenceNumber getLastProcessedSequenceNum() {
return this.lastProcessedSequenceNum;
}
@@ -46,6 +52,7 @@ public class KinesisStreamShardState {
public String toString() {
return "KinesisStreamShardState{" +
"kinesisStreamShard='" + kinesisStreamShard.toString() + "'" +
+ ", streamShardHandle='" + streamShardHandle.toString() + "'" +
", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}";
}
@@ -61,11 +68,13 @@ public class KinesisStreamShardState {
KinesisStreamShardState other = (KinesisStreamShardState) obj;
- return kinesisStreamShard.equals(other.getKinesisStreamShard()) && lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
+ return kinesisStreamShard.equals(other.getKinesisStreamShard()) &&
+ streamShardHandle.equals(other.getStreamShardHandle()) &&
+ lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
}
@Override
public int hashCode() {
- return 37 * (kinesisStreamShard.hashCode() + lastProcessedSequenceNum.hashCode());
+ return 37 * (kinesisStreamShard.hashCode() + streamShardHandle.hashCode() + lastProcessedSequenceNum.hashCode());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
new file mode 100644
index 0000000..71cb6fa
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
+ * disintegrating from {@link com.amazonaws.services.kinesis.model.Shard} and its nested classes.
+ */
+public class KinesisStreamShardV2 implements Serializable {
+
+ private static final long serialVersionUID = 5134869582298563604L;
+
+ private String streamName;
+ private String shardId;
+ private String parentShardId;
+ private String adjacentParentShardId;
+ private String startingHashKey;
+ private String endingHashKey;
+ private String startingSequenceNumber;
+ private String endingSequenceNumber;
+
+ public void setStreamName(String streamName) {
+ this.streamName = streamName;
+ }
+
+ public void setShardId(String shardId) {
+ this.shardId = shardId;
+ }
+
+ public void setParentShardId(String parentShardId) {
+ this.parentShardId = parentShardId;
+ }
+
+ public void setAdjacentParentShardId(String adjacentParentShardId) {
+ this.adjacentParentShardId = adjacentParentShardId;
+ }
+
+ public void setStartingHashKey(String startingHashKey) {
+ this.startingHashKey = startingHashKey;
+ }
+
+ public void setEndingHashKey(String endingHashKey) {
+ this.endingHashKey = endingHashKey;
+ }
+
+ public void setStartingSequenceNumber(String startingSequenceNumber) {
+ this.startingSequenceNumber = startingSequenceNumber;
+ }
+
+ public void setEndingSequenceNumber(String endingSequenceNumber) {
+ this.endingSequenceNumber = endingSequenceNumber;
+ }
+
+ public String getStreamName() {
+ return this.streamName;
+ }
+
+ public String getShardId() {
+ return this.shardId;
+ }
+
+ public String getParentShardId() {
+ return this.parentShardId;
+ }
+
+ public String getAdjacentParentShardId() {
+ return this.adjacentParentShardId;
+ }
+
+ public String getStartingHashKey() {
+ return this.startingHashKey;
+ }
+
+ public String getEndingHashKey() {
+ return this.endingHashKey;
+ }
+
+ public String getStartingSequenceNumber() {
+ return this.startingSequenceNumber;
+ }
+
+ public String getEndingSequenceNumber() {
+ return this.endingSequenceNumber;
+ }
+
+ @Override
+ public String toString() {
+ return "KinesisStreamShardV2{" +
+ "streamName='" + streamName + "'" +
+ ", shardId='" + shardId + "'" +
+ ", parentShardId='" + parentShardId + "'" +
+ ", adjacentParentShardId='" + adjacentParentShardId + "'" +
+ ", startingHashKey='" + startingHashKey + "'" +
+ ", endingHashKey='" + endingHashKey + "'" +
+ ", startingSequenceNumber='" + startingSequenceNumber + "'" +
+ ", endingSequenceNumber='" + endingSequenceNumber + "'}";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof KinesisStreamShardV2)) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ KinesisStreamShardV2 other = (KinesisStreamShardV2) obj;
+
+ return streamName.equals(other.getStreamName()) &&
+ shardId.equals(other.getShardId()) &&
+ Objects.equals(parentShardId, other.getParentShardId()) &&
+ Objects.equals(adjacentParentShardId, other.getAdjacentParentShardId()) &&
+ Objects.equals(startingHashKey, other.getStartingHashKey()) &&
+ Objects.equals(endingHashKey, other.getEndingHashKey()) &&
+ Objects.equals(startingSequenceNumber, other.getStartingSequenceNumber()) &&
+ Objects.equals(endingSequenceNumber, other.getEndingSequenceNumber());
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 17;
+
+ if (streamName != null) {
+ hash = 37 * hash + streamName.hashCode();
+ }
+ if (shardId != null) {
+ hash = 37 * hash + shardId.hashCode();
+ }
+ if (parentShardId != null) {
+ hash = 37 * hash + parentShardId.hashCode();
+ }
+ if (adjacentParentShardId != null) {
+ hash = 37 * hash + adjacentParentShardId.hashCode();
+ }
+ if (startingHashKey != null) {
+ hash = 37 * hash + startingHashKey.hashCode();
+ }
+ if (endingHashKey != null) {
+ hash = 37 * hash + endingHashKey.hashCode();
+ }
+ if (startingSequenceNumber != null) {
+ hash = 37 * hash + startingSequenceNumber.hashCode();
+ }
+ if (endingSequenceNumber != null) {
+ hash = 37 * hash + endingSequenceNumber.hashCode();
+ }
+
+ return hash;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
new file mode 100644
index 0000000..d340a88
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import com.amazonaws.services.kinesis.model.Shard;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A wrapper class around the information provided along with streamName and {@link com.amazonaws.services.kinesis.model.Shard},
+ * with some extra utility methods to determine whether or not a shard is closed and whether or not the shard is
+ * a result of parent shard splits or merges.
+ */
+public class StreamShardHandle {
+
+ private final String streamName;
+ private final Shard shard;
+
+ private final int cachedHash;
+
+ /**
+ * Create a new StreamShardHandle
+ *
+ * @param streamName
+ * the name of the Kinesis stream that this shard belongs to
+ * @param shard
+ * the actual AWS Shard instance that will be wrapped within this StreamShardHandle
+ */
+ public StreamShardHandle(String streamName, Shard shard) {
+ this.streamName = checkNotNull(streamName);
+ this.shard = checkNotNull(shard);
+
+ // since our description of Kinesis Streams shards can be fully defined with the stream name and shard id,
+ // our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation
+ int hash = 17;
+ hash = 37 * hash + streamName.hashCode();
+ hash = 37 * hash + shard.getShardId().hashCode();
+ this.cachedHash = hash;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public boolean isClosed() {
+ return (shard.getSequenceNumberRange().getEndingSequenceNumber() != null);
+ }
+
+ public Shard getShard() {
+ return shard;
+ }
+
+ @Override
+ public String toString() {
+ return "StreamShardHandle{" +
+ "streamName='" + streamName + "'" +
+ ", shard='" + shard.toString() + "'}";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof StreamShardHandle)) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ StreamShardHandle other = (StreamShardHandle) obj;
+
+ return streamName.equals(other.getStreamName()) && shard.equals(other.getShard());
+ }
+
+ @Override
+ public int hashCode() {
+ return cachedHash;
+ }
+
+ /**
+ * Utility function to compare two shard ids
+ *
+ * @param firstShardId first shard id to compare
+ * @param secondShardId second shard id to compare
+ * @return a value less than 0 if the first shard id is smaller than the second shard id,
+ * or a value larger than 0 the first shard is larger then the second shard id,
+ * or 0 if they are equal
+ */
+ public static int compareShardIds(String firstShardId, String secondShardId) {
+ if (!isValidShardId(firstShardId)) {
+ throw new IllegalArgumentException("The first shard id has invalid format.");
+ }
+
+ if (!isValidShardId(secondShardId)) {
+ throw new IllegalArgumentException("The second shard id has invalid format.");
+ }
+
+ // digit segment of the shard id starts at index 8
+ return Long.compare(Long.parseLong(firstShardId.substring(8)), Long.parseLong(secondShardId.substring(8)));
+ }
+
+ /**
+ * Checks if a shard id has valid format.
+ * Kinesis stream shard ids have 12-digit numbers left-padded with 0's,
+ * prefixed with "shardId-", ex. "shardId-000000000015".
+ *
+ * @param shardId the shard id to check
+ * @return whether the shard id is valid
+ */
+ public static boolean isValidShardId(String shardId) {
+ if (shardId == null) { return false; }
+ return shardId.matches("^shardId-\\d{12}");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
index 04b1654..aadb31c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
@@ -17,7 +17,7 @@
package org.apache.flink.streaming.connectors.kinesis.proxy;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import java.util.LinkedList;
import java.util.List;
@@ -30,25 +30,25 @@ import java.util.Set;
*/
public class GetShardListResult {
- private final Map<String, LinkedList<KinesisStreamShard>> streamsToRetrievedShardList = new HashMap<>();
+ private final Map<String, LinkedList<StreamShardHandle>> streamsToRetrievedShardList = new HashMap<>();
- public void addRetrievedShardToStream(String stream, KinesisStreamShard retrievedShard) {
+ public void addRetrievedShardToStream(String stream, StreamShardHandle retrievedShard) {
if (!streamsToRetrievedShardList.containsKey(stream)) {
- streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
+ streamsToRetrievedShardList.put(stream, new LinkedList<StreamShardHandle>());
}
streamsToRetrievedShardList.get(stream).add(retrievedShard);
}
- public void addRetrievedShardsToStream(String stream, List<KinesisStreamShard> retrievedShards) {
+ public void addRetrievedShardsToStream(String stream, List<StreamShardHandle> retrievedShards) {
if (retrievedShards.size() != 0) {
if (!streamsToRetrievedShardList.containsKey(stream)) {
- streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
+ streamsToRetrievedShardList.put(stream, new LinkedList<StreamShardHandle>());
}
streamsToRetrievedShardList.get(stream).addAll(retrievedShards);
}
}
- public List<KinesisStreamShard> getRetrievedShardListOfStream(String stream) {
+ public List<StreamShardHandle> getRetrievedShardListOfStream(String stream) {
if (!streamsToRetrievedShardList.containsKey(stream)) {
return null;
} else {
@@ -56,7 +56,7 @@ public class GetShardListResult {
}
}
- public KinesisStreamShard getLastSeenShardOfStream(String stream) {
+ public StreamShardHandle getLastSeenShardOfStream(String stream) {
if (!streamsToRetrievedShardList.containsKey(stream)) {
return null;
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 580555f..70c1286 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -32,7 +32,7 @@ import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -237,7 +237,7 @@ public class KinesisProxy implements KinesisProxyInterface {
* {@inheritDoc}
*/
@Override
- public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
+ public String getShardIterator(StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
.withStreamName(shard.getStreamName())
.withShardId(shard.getShard().getShardId())
@@ -315,8 +315,8 @@ public class KinesisProxy implements KinesisProxyInterface {
}
}
- private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
- List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
+ private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
+ List<StreamShardHandle> shardsOfStream = new ArrayList<>();
DescribeStreamResult describeStreamResult;
do {
@@ -324,7 +324,7 @@ public class KinesisProxy implements KinesisProxyInterface {
List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
for (Shard shard : shards) {
- shardsOfStream.add(new KinesisStreamShard(streamName, shard));
+ shardsOfStream.add(new StreamShardHandle(streamName, shard));
}
if (shards.size() != 0) {
@@ -384,7 +384,7 @@ public class KinesisProxy implements KinesisProxyInterface {
List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
Iterator<Shard> shardItr = shards.iterator();
while (shardItr.hasNext()) {
- if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
+ if (StreamShardHandle.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
shardItr.remove();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
index 9f6d594..807a163 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.connectors.kinesis.proxy;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import java.util.Map;
@@ -43,7 +43,7 @@ public interface KinesisProxyInterface {
* operation has exceeded the rate limit; this exception will be thrown
* if the backoff is interrupted.
*/
- String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) throws InterruptedException;
+ String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) throws InterruptedException;
/**
* Get the next batch of data records using a specific shard iterator
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index ec9a9b5..d2af6ad 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
@@ -101,9 +102,9 @@ public class FlinkKinesisConsumerMigrationTest {
testHarness.open();
// the expected state in "kafka-consumer-migration-test-flink1.1-snapshot"
- final HashMap<KinesisStreamShard, SequenceNumber> expectedState = new HashMap<>();
- expectedState.put(new KinesisStreamShard("fakeStream1",
- new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+ final HashMap<KinesisStreamShardV2, SequenceNumber> expectedState = new HashMap<>();
+ expectedState.put(FlinkKinesisConsumer.createKinesisStreamShardV2(new KinesisStreamShard("fakeStream1",
+ new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("987654321"));
// assert that state is correctly restored from legacy checkpoint
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 4b178c7..760858a 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -17,12 +17,17 @@
package org.apache.flink.streaming.connectors.kinesis;
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateInitializationContext;
@@ -36,6 +41,8 @@ import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
@@ -62,6 +69,7 @@ import java.util.UUID;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -109,8 +117,8 @@ public class FlinkKinesisConsumerTest {
@Test
public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
- "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
+ exception.expectMessage("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
+ "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
@@ -535,28 +543,26 @@ public class FlinkKinesisConsumerTest {
config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
-
- List<Tuple2<KinesisStreamShard, SequenceNumber>> globalUnionState = new ArrayList<>(4);
+ List<Tuple2<KinesisStreamShardV2, SequenceNumber>> globalUnionState = new ArrayList<>(4);
globalUnionState.add(Tuple2.of(
- new KinesisStreamShard("fakeStream",
- new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+ KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+ new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
- new KinesisStreamShard("fakeStream",
- new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+ KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+ new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
- new KinesisStreamShard("fakeStream",
- new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+ KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+ new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
- new KinesisStreamShard("fakeStream",
- new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3))),
+ KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+ new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
new SequenceNumber("1")));
- TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
- for (Tuple2<KinesisStreamShard, SequenceNumber> state : globalUnionState) {
+ TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+ for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : globalUnionState) {
listState.add(state);
}
@@ -566,10 +572,10 @@ public class FlinkKinesisConsumerTest {
when(context.getNumberOfParallelSubtasks()).thenReturn(2);
consumer.setRuntimeContext(context);
+ OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
@@ -600,32 +606,32 @@ public class FlinkKinesisConsumerTest {
config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- ArrayList<Tuple2<KinesisStreamShard, SequenceNumber>> initialState = new ArrayList<>(1);
+ ArrayList<Tuple2<KinesisStreamShardV2, SequenceNumber>> initialState = new ArrayList<>(1);
initialState.add(Tuple2.of(
- new KinesisStreamShard("fakeStream1",
- new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+ KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+ new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("1")));
- ArrayList<Tuple2<KinesisStreamShard, SequenceNumber>> expectedStateSnapshot = new ArrayList<>(3);
+ ArrayList<Tuple2<KinesisStreamShardV2, SequenceNumber>> expectedStateSnapshot = new ArrayList<>(3);
expectedStateSnapshot.add(Tuple2.of(
- new KinesisStreamShard("fakeStream1",
- new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+ KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+ new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("12")));
expectedStateSnapshot.add(Tuple2.of(
- new KinesisStreamShard("fakeStream1",
- new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+ KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+ new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
new SequenceNumber("11")));
expectedStateSnapshot.add(Tuple2.of(
- new KinesisStreamShard("fakeStream1",
- new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+ KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+ new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
new SequenceNumber("31")));
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
- TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
- for (Tuple2<KinesisStreamShard, SequenceNumber> state: initialState) {
+ TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+ for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : initialState) {
listState.add(state);
}
@@ -640,8 +646,8 @@ public class FlinkKinesisConsumerTest {
// mock a running fetcher and its state for snapshot
// ----------------------------------------------------------------------
- HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new HashMap<>();
- for (Tuple2<KinesisStreamShard, SequenceNumber> tuple: expectedStateSnapshot) {
+ HashMap<KinesisStreamShardV2, SequenceNumber> stateSnapshot = new HashMap<>();
+ for (Tuple2<KinesisStreamShardV2, SequenceNumber> tuple : expectedStateSnapshot) {
stateSnapshot.put(tuple.f0, tuple.f1);
}
@@ -668,15 +674,15 @@ public class FlinkKinesisConsumerTest {
assertEquals(true, listState.clearCalled);
assertEquals(3, listState.getList().size());
- for (Tuple2<KinesisStreamShard, SequenceNumber> state: initialState) {
- for (Tuple2<KinesisStreamShard, SequenceNumber> currentState: listState.getList()) {
+ for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : initialState) {
+ for (Tuple2<KinesisStreamShardV2, SequenceNumber> currentState : listState.getList()) {
assertNotEquals(state, currentState);
}
}
- for (Tuple2<KinesisStreamShard, SequenceNumber> state: expectedStateSnapshot) {
+ for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : expectedStateSnapshot) {
boolean hasOneIsSame = false;
- for (Tuple2<KinesisStreamShard, SequenceNumber> currentState: listState.getList()) {
+ for (Tuple2<KinesisStreamShardV2, SequenceNumber> currentState : listState.getList()) {
hasOneIsSame = hasOneIsSame || state.equals(currentState);
}
assertEquals(true, hasOneIsSame);
@@ -706,10 +712,14 @@ public class FlinkKinesisConsumerTest {
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededIfRestoringFromLegacyCheckpoint() throws Exception {
- HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
+ HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
+ HashMap<KinesisStreamShard, SequenceNumber> legacyFakeRestoredState = new HashMap<>();
+ for (Map.Entry<StreamShardHandle, SequenceNumber> kv : fakeRestoredState.entrySet()) {
+ legacyFakeRestoredState.put(new KinesisStreamShard(kv.getKey().getStreamName(), kv.getKey().getShard()), kv.getValue());
+ }
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
- List<KinesisStreamShard> shards = new ArrayList<>();
+ List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
@@ -720,13 +730,14 @@ public class FlinkKinesisConsumerTest {
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
- consumer.restoreState(fakeRestoredState);
+ consumer.restoreState(legacyFakeRestoredState);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
- for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+ for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
- new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+ new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+ restoredShard.getKey(), restoredShard.getValue()));
}
}
@@ -738,15 +749,15 @@ public class FlinkKinesisConsumerTest {
// setup initial state
// ----------------------------------------------------------------------
- HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
+ HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
- TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
- for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) {
- listState.add(Tuple2.of(state.getKey(), state.getValue()));
+ TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+ for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
+ listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -761,7 +772,7 @@ public class FlinkKinesisConsumerTest {
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
- List<KinesisStreamShard> shards = new ArrayList<>();
+ List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
@@ -780,9 +791,10 @@ public class FlinkKinesisConsumerTest {
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
- for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+ for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
- new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+ new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+ restoredShard.getKey(), restoredShard.getValue()));
}
}
@@ -794,20 +806,20 @@ public class FlinkKinesisConsumerTest {
// setup initial state
// ----------------------------------------------------------------------
- HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("fakeStream1");
+ HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("fakeStream1");
- HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2");
+ HashMap<StreamShardHandle, SequenceNumber> fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
- TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
- for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) {
- listState.add(Tuple2.of(state.getKey(), state.getValue()));
+ TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+ for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
+ listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
}
- for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredStateForOthers.entrySet()) {
- listState.add(Tuple2.of(state.getKey(), state.getValue()));
+ for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredStateForOthers.entrySet()) {
+ listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -822,7 +834,7 @@ public class FlinkKinesisConsumerTest {
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
- List<KinesisStreamShard> shards = new ArrayList<>();
+ List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
@@ -841,15 +853,17 @@ public class FlinkKinesisConsumerTest {
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
- for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet()) {
+ for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet()) {
// should never get restored state not belonging to itself
Mockito.verify(mockedFetcher, never()).registerNewSubscribedShardState(
- new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+ new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+ restoredShard.getKey(), restoredShard.getValue()));
}
- for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+ for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
// should get restored state belonging to itself
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
- new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+ new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+ restoredShard.getKey(), restoredShard.getValue()));
}
}
@@ -890,15 +904,15 @@ public class FlinkKinesisConsumerTest {
// setup initial state
// ----------------------------------------------------------------------
- HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
+ HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
- TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
- for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) {
- listState.add(Tuple2.of(state.getKey(), state.getValue()));
+ TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+ for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
+ listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -913,9 +927,9 @@ public class FlinkKinesisConsumerTest {
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
- List<KinesisStreamShard> shards = new ArrayList<>();
+ List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
- shards.add(new KinesisStreamShard("fakeStream2",
+ shards.add(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
@@ -934,15 +948,58 @@ public class FlinkKinesisConsumerTest {
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
- fakeRestoredState.put(new KinesisStreamShard("fakeStream2",
+ fakeRestoredState.put(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
- for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+ for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
- new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+ new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+ restoredShard.getKey(), restoredShard.getValue()));
}
}
+ @Test
+ public void testCreateFunctionToConvertBetweenKinesisStreamShardAndKinesisStreamShardV2() {
+ String streamName = "fakeStream1";
+ String shardId = "shard-000001";
+ String parentShardId = "shard-000002";
+ String adjacentParentShardId = "shard-000003";
+ String startingHashKey = "key-000001";
+ String endingHashKey = "key-000010";
+ String startingSequenceNumber = "seq-0000021";
+ String endingSequenceNumber = "seq-00000031";
+
+ KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
+ kinesisStreamShardV2.setStreamName(streamName);
+ kinesisStreamShardV2.setShardId(shardId);
+ kinesisStreamShardV2.setParentShardId(parentShardId);
+ kinesisStreamShardV2.setAdjacentParentShardId(adjacentParentShardId);
+ kinesisStreamShardV2.setStartingHashKey(startingHashKey);
+ kinesisStreamShardV2.setEndingHashKey(endingHashKey);
+ kinesisStreamShardV2.setStartingSequenceNumber(startingSequenceNumber);
+ kinesisStreamShardV2.setEndingSequenceNumber(endingSequenceNumber);
+
+ Shard shard = new Shard()
+ .withShardId(shardId)
+ .withParentShardId(parentShardId)
+ .withAdjacentParentShardId(adjacentParentShardId)
+ .withHashKeyRange(new HashKeyRange()
+ .withStartingHashKey(startingHashKey)
+ .withEndingHashKey(endingHashKey))
+ .withSequenceNumberRange(new SequenceNumberRange()
+ .withStartingSequenceNumber(startingSequenceNumber)
+ .withEndingSequenceNumber(endingSequenceNumber));
+ KinesisStreamShard kinesisStreamShard = new KinesisStreamShard(streamName, shard);
+
+ assertEquals(kinesisStreamShardV2, FlinkKinesisConsumer.createKinesisStreamShardV2(kinesisStreamShard));
+ }
+
+ @Test
+ public void testKinesisStreamShardV2WillUsePojoSerializer() {
+ TypeInformation<KinesisStreamShardV2> typeInformation = TypeInformation.of(KinesisStreamShardV2.class);
+ assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
+ }
+
private static final class TestingListState<T> implements ListState<T> {
private final List<T> list = new ArrayList<>();
@@ -973,31 +1030,31 @@ public class FlinkKinesisConsumerTest {
}
}
- private HashMap<KinesisStreamShard, SequenceNumber> getFakeRestoredStore(String streamName) {
- HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>();
+ private HashMap<StreamShardHandle, SequenceNumber> getFakeRestoredStore(String streamName) {
+ HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = new HashMap<>();
if (streamName.equals("fakeStream1") || streamName.equals("all")) {
fakeRestoredState.put(
- new KinesisStreamShard("fakeStream1",
+ new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
new SequenceNumber(UUID.randomUUID().toString()));
fakeRestoredState.put(
- new KinesisStreamShard("fakeStream1",
+ new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
new SequenceNumber(UUID.randomUUID().toString()));
fakeRestoredState.put(
- new KinesisStreamShard("fakeStream1",
+ new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
new SequenceNumber(UUID.randomUUID().toString()));
}
if (streamName.equals("fakeStream2") || streamName.equals("all")) {
fakeRestoredState.put(
- new KinesisStreamShard("fakeStream2",
+ new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
new SequenceNumber(UUID.randomUUID().toString()));
fakeRestoredState.put(
- new KinesisStreamShard("fakeStream2",
+ new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
new SequenceNumber(UUID.randomUUID().toString()));
}