You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/26 08:28:00 UTC
[8/8] flink git commit: [FLINK-6653] [kinesis] Improvements to
removal of AWS's Shard class in checkpoints
[FLINK-6653] [kinesis] Improvements to removal of AWS's Shard class in checkpoints
This closes #3994.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2597e7e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2597e7e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2597e7e1
Branch: refs/heads/master
Commit: 2597e7e1803da66190ff545e705a6a4e6a6f76a2
Parents: 913be2f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 26 15:56:54 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:27:19 2017 +0800
----------------------------------------------------------------------
.../kinesis/FlinkKinesisConsumer.java | 55 ++----
.../kinesis/internals/KinesisDataFetcher.java | 74 ++++----
.../kinesis/model/KinesisStreamShard.java | 27 +++
.../kinesis/model/KinesisStreamShardState.java | 30 ++--
.../kinesis/model/KinesisStreamShardV2.java | 171 ------------------
.../kinesis/model/StreamShardMetadata.java | 173 +++++++++++++++++++
.../FlinkKinesisConsumerMigrationTest.java | 6 +-
.../kinesis/FlinkKinesisConsumerTest.java | 94 +++++-----
.../internals/KinesisDataFetcherTest.java | 18 +-
.../kinesis/internals/ShardConsumerTest.java | 4 +-
10 files changed, 333 insertions(+), 319 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index b7f5506..ea76ccc 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardSta
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
@@ -100,7 +100,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
private transient KinesisDataFetcher<T> fetcher;
/** The sequence numbers to restore to upon restore from failure */
- private transient HashMap<KinesisStreamShardV2, SequenceNumber> sequenceNumsToRestore;
+ private transient HashMap<StreamShardMetadata, SequenceNumber> sequenceNumsToRestore;
private volatile boolean running = true;
@@ -111,7 +111,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
/** State name to access shard sequence number states; cannot be changed */
private static final String sequenceNumsStateStoreName = "Kinesis-Stream-Shard-State";
- private transient ListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> sequenceNumsStateForCheckpoint;
+ private transient ListState<Tuple2<StreamShardMetadata, SequenceNumber>> sequenceNumsStateForCheckpoint;
// ------------------------------------------------------------------------
// Constructors
@@ -202,7 +202,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();
for (StreamShardHandle shard : allShards) {
- KinesisStreamShardV2 kinesisStreamShard = KinesisDataFetcher.createKinesisStreamShardV2(shard);
+ StreamShardMetadata kinesisStreamShard = KinesisDataFetcher.convertToStreamShardMetadata(shard);
if (sequenceNumsToRestore != null) {
if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
// if the shard was already seen and is contained in the state,
@@ -298,8 +298,8 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- TypeInformation<Tuple2<KinesisStreamShardV2, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
- TypeInformation.of(KinesisStreamShardV2.class),
+ TypeInformation<Tuple2<StreamShardMetadata, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
+ TypeInformation.of(StreamShardMetadata.class),
TypeInformation.of(SequenceNumber.class));
sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
@@ -308,7 +308,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
if (context.isRestored()) {
if (sequenceNumsToRestore == null) {
sequenceNumsToRestore = new HashMap<>();
- for (Tuple2<KinesisStreamShardV2, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
+ for (Tuple2<StreamShardMetadata, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
sequenceNumsToRestore.put(kinesisSequenceNumber.f0, kinesisSequenceNumber.f1);
}
@@ -333,12 +333,12 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
if (fetcher == null) {
if (sequenceNumsToRestore != null) {
- for (Map.Entry<KinesisStreamShardV2, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
+ for (Map.Entry<StreamShardMetadata, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
// sequenceNumsToRestore is the restored global union state;
// should only snapshot shards that actually belong to us
if (KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(
- KinesisDataFetcher.createStreamShardHandle(entry.getKey()),
+ KinesisDataFetcher.convertToStreamShardHandle(entry.getKey()),
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getIndexOfThisSubtask())) {
@@ -347,14 +347,14 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
}
}
} else {
- HashMap<KinesisStreamShardV2, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();
+ HashMap<StreamShardMetadata, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();
if (LOG.isDebugEnabled()) {
LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
}
- for (Map.Entry<KinesisStreamShardV2, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
+ for (Map.Entry<StreamShardMetadata, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
}
}
@@ -370,8 +370,10 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
sequenceNumsToRestore = null;
} else {
sequenceNumsToRestore = new HashMap<>();
- for (Map.Entry<KinesisStreamShard, SequenceNumber> kv: restoredState.entrySet()) {
- sequenceNumsToRestore.put(createKinesisStreamShardV2(kv.getKey()), kv.getValue());
+ for (Map.Entry<KinesisStreamShard, SequenceNumber> stateEntry : restoredState.entrySet()) {
+ sequenceNumsToRestore.put(
+ KinesisStreamShard.convertToStreamShardMetadata(stateEntry.getKey()),
+ stateEntry.getValue());
}
}
}
@@ -388,32 +390,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
}
@VisibleForTesting
- HashMap<KinesisStreamShardV2, SequenceNumber> getRestoredState() {
+ HashMap<StreamShardMetadata, SequenceNumber> getRestoredState() {
return sequenceNumsToRestore;
}
-
- /**
- * Utility function to convert {@link KinesisStreamShard} into {@link KinesisStreamShardV2}
- *
- * @param kinesisStreamShard the {@link KinesisStreamShard} to be converted
- * @return a {@link KinesisStreamShardV2} object
- */
- public static KinesisStreamShardV2 createKinesisStreamShardV2(KinesisStreamShard kinesisStreamShard) {
- KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
-
- kinesisStreamShardV2.setStreamName(kinesisStreamShard.getStreamName());
- kinesisStreamShardV2.setShardId(kinesisStreamShard.getShard().getShardId());
- kinesisStreamShardV2.setParentShardId(kinesisStreamShard.getShard().getParentShardId());
- kinesisStreamShardV2.setAdjacentParentShardId(kinesisStreamShard.getShard().getAdjacentParentShardId());
- if (kinesisStreamShard.getShard().getHashKeyRange() != null) {
- kinesisStreamShardV2.setStartingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getStartingHashKey());
- kinesisStreamShardV2.setEndingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getEndingHashKey());
- }
- if (kinesisStreamShard.getShard().getSequenceNumberRange() != null) {
- kinesisStreamShardV2.setStartingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getStartingSequenceNumber());
- kinesisStreamShardV2.setEndingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getEndingSequenceNumber());
- }
-
- return kinesisStreamShardV2;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index b0dceec..11ac6d4 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -303,7 +303,7 @@ public class KinesisDataFetcher<T> {
// since there may be delay in discovering a new shard, all new shards due to
// resharding should be read starting from the earliest record possible
KinesisStreamShardState newShardState =
- new KinesisStreamShardState(createKinesisStreamShardV2(shard), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
+ new KinesisStreamShardState(convertToStreamShardMetadata(shard), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
int newStateIndex = registerNewSubscribedShardState(newShardState);
if (LOG.isInfoEnabled()) {
@@ -353,13 +353,13 @@ public class KinesisDataFetcher<T> {
*
* @return state snapshot
*/
- public HashMap<KinesisStreamShardV2, SequenceNumber> snapshotState() {
+ public HashMap<StreamShardMetadata, SequenceNumber> snapshotState() {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
- HashMap<KinesisStreamShardV2, SequenceNumber> stateSnapshot = new HashMap<>();
+ HashMap<StreamShardMetadata, SequenceNumber> stateSnapshot = new HashMap<>();
for (KinesisStreamShardState shardWithState : subscribedShardsState) {
- stateSnapshot.put(shardWithState.getKinesisStreamShard(), shardWithState.getLastProcessedSequenceNum());
+ stateSnapshot.put(shardWithState.getStreamShardMetadata(), shardWithState.getLastProcessedSequenceNum());
}
return stateSnapshot;
}
@@ -588,56 +588,54 @@ public class KinesisDataFetcher<T> {
}
/**
- * Utility function to convert {@link StreamShardHandle} into {@link KinesisStreamShardV2}
+ * Utility function to convert {@link StreamShardHandle} into {@link StreamShardMetadata}
*
* @param streamShardHandle the {@link StreamShardHandle} to be converted
- * @return a {@link KinesisStreamShardV2} object
+ * @return a {@link StreamShardMetadata} object
*/
- public static KinesisStreamShardV2 createKinesisStreamShardV2(StreamShardHandle streamShardHandle) {
- KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
+ public static StreamShardMetadata convertToStreamShardMetadata(StreamShardHandle streamShardHandle) {
+ StreamShardMetadata streamShardMetadata = new StreamShardMetadata();
+
+ streamShardMetadata.setStreamName(streamShardHandle.getStreamName());
+ streamShardMetadata.setShardId(streamShardHandle.getShard().getShardId());
+ streamShardMetadata.setParentShardId(streamShardHandle.getShard().getParentShardId());
+ streamShardMetadata.setAdjacentParentShardId(streamShardHandle.getShard().getAdjacentParentShardId());
- kinesisStreamShardV2.setStreamName(streamShardHandle.getStreamName());
- kinesisStreamShardV2.setShardId(streamShardHandle.getShard().getShardId());
- kinesisStreamShardV2.setParentShardId(streamShardHandle.getShard().getParentShardId());
- kinesisStreamShardV2.setAdjacentParentShardId(streamShardHandle.getShard().getAdjacentParentShardId());
if (streamShardHandle.getShard().getHashKeyRange() != null) {
- kinesisStreamShardV2.setStartingHashKey(streamShardHandle.getShard().getHashKeyRange().getStartingHashKey());
- kinesisStreamShardV2.setEndingHashKey(streamShardHandle.getShard().getHashKeyRange().getEndingHashKey());
+ streamShardMetadata.setStartingHashKey(streamShardHandle.getShard().getHashKeyRange().getStartingHashKey());
+ streamShardMetadata.setEndingHashKey(streamShardHandle.getShard().getHashKeyRange().getEndingHashKey());
}
+
if (streamShardHandle.getShard().getSequenceNumberRange() != null) {
- kinesisStreamShardV2.setStartingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getStartingSequenceNumber());
- kinesisStreamShardV2.setEndingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getEndingSequenceNumber());
+ streamShardMetadata.setStartingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getStartingSequenceNumber());
+ streamShardMetadata.setEndingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getEndingSequenceNumber());
}
- return kinesisStreamShardV2;
+ return streamShardMetadata;
}
/**
- * Utility function to convert {@link KinesisStreamShardV2} into {@link StreamShardHandle}
+ * Utility function to convert {@link StreamShardMetadata} into {@link StreamShardHandle}
*
- * @param kinesisStreamShard the {@link KinesisStreamShardV2} to be converted
+ * @param streamShardMetadata the {@link StreamShardMetadata} to be converted
* @return a {@link StreamShardHandle} object
*/
- public static StreamShardHandle createStreamShardHandle(KinesisStreamShardV2 kinesisStreamShard) {
+ public static StreamShardHandle convertToStreamShardHandle(StreamShardMetadata streamShardMetadata) {
Shard shard = new Shard();
- shard.withShardId(kinesisStreamShard.getShardId());
- shard.withParentShardId(kinesisStreamShard.getParentShardId());
- shard.withAdjacentParentShardId(kinesisStreamShard.getAdjacentParentShardId());
-
- if (kinesisStreamShard.getStartingHashKey() != null && kinesisStreamShard.getEndingHashKey() != null) {
- HashKeyRange hashKeyRange = new HashKeyRange();
- hashKeyRange.withStartingHashKey(kinesisStreamShard.getStartingHashKey());
- hashKeyRange.withEndingHashKey(kinesisStreamShard.getEndingHashKey());
- shard.withHashKeyRange(hashKeyRange);
- }
+ shard.withShardId(streamShardMetadata.getShardId());
+ shard.withParentShardId(streamShardMetadata.getParentShardId());
+ shard.withAdjacentParentShardId(streamShardMetadata.getAdjacentParentShardId());
- if (kinesisStreamShard.getStartingSequenceNumber() != null && kinesisStreamShard.getEndingSequenceNumber() != null) {
- SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
- sequenceNumberRange.withStartingSequenceNumber(kinesisStreamShard.getStartingSequenceNumber());
- sequenceNumberRange.withEndingSequenceNumber(kinesisStreamShard.getEndingSequenceNumber());
- shard.withSequenceNumberRange(sequenceNumberRange);
- }
+ HashKeyRange hashKeyRange = new HashKeyRange();
+ hashKeyRange.withStartingHashKey(streamShardMetadata.getStartingHashKey());
+ hashKeyRange.withEndingHashKey(streamShardMetadata.getEndingHashKey());
+ shard.withHashKeyRange(hashKeyRange);
+
+ SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
+ sequenceNumberRange.withStartingSequenceNumber(streamShardMetadata.getStartingSequenceNumber());
+ sequenceNumberRange.withEndingSequenceNumber(streamShardMetadata.getEndingSequenceNumber());
+ shard.withSequenceNumberRange(sequenceNumberRange);
- return new StreamShardHandle(kinesisStreamShard.getStreamName(), shard);
+ return new StreamShardHandle(streamShardMetadata.getStreamName(), shard);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
index f3dcfe1..592e30d 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
@@ -129,4 +129,31 @@ public class KinesisStreamShard implements Serializable {
if (shardId == null) { return false; }
return shardId.matches("^shardId-\\d{12}");
}
+
+ /**
+ * Utility function to convert {@link KinesisStreamShard} into the new {@link StreamShardMetadata} model.
+ *
+ * @param kinesisStreamShard the {@link KinesisStreamShard} to be converted
+ * @return the converted {@link StreamShardMetadata}
+ */
+ public static StreamShardMetadata convertToStreamShardMetadata(KinesisStreamShard kinesisStreamShard) {
+ StreamShardMetadata streamShardMetadata = new StreamShardMetadata();
+
+ streamShardMetadata.setStreamName(kinesisStreamShard.getStreamName());
+ streamShardMetadata.setShardId(kinesisStreamShard.getShard().getShardId());
+ streamShardMetadata.setParentShardId(kinesisStreamShard.getShard().getParentShardId());
+ streamShardMetadata.setAdjacentParentShardId(kinesisStreamShard.getShard().getAdjacentParentShardId());
+
+ if (kinesisStreamShard.getShard().getHashKeyRange() != null) {
+ streamShardMetadata.setStartingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getStartingHashKey());
+ streamShardMetadata.setEndingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getEndingHashKey());
+ }
+
+ if (kinesisStreamShard.getShard().getSequenceNumberRange() != null) {
+ streamShardMetadata.setStartingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getStartingSequenceNumber());
+ streamShardMetadata.setEndingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getEndingSequenceNumber());
+ }
+
+ return streamShardMetadata;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
index e68129d..4b1cc1c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
@@ -17,23 +17,33 @@
package org.apache.flink.streaming.connectors.kinesis.model;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.util.Preconditions;
+
/**
* A wrapper class that bundles a {@link StreamShardHandle} with its last processed sequence number.
*/
public class KinesisStreamShardState {
- private KinesisStreamShardV2 kinesisStreamShard;
+ /** A handle object that wraps the actual {@link Shard} instance and stream name. */
private StreamShardHandle streamShardHandle;
+
+ /** The checkpointed state for each Kinesis stream shard. */
+ private StreamShardMetadata streamShardMetadata;
private SequenceNumber lastProcessedSequenceNum;
- public KinesisStreamShardState(KinesisStreamShardV2 kinesisStreamShard, StreamShardHandle streamShardHandle, SequenceNumber lastProcessedSequenceNum) {
- this.kinesisStreamShard = kinesisStreamShard;
- this.streamShardHandle = streamShardHandle;
- this.lastProcessedSequenceNum = lastProcessedSequenceNum;
+ public KinesisStreamShardState(
+ StreamShardMetadata streamShardMetadata,
+ StreamShardHandle streamShardHandle,
+ SequenceNumber lastProcessedSequenceNum) {
+
+ this.streamShardMetadata = Preconditions.checkNotNull(streamShardMetadata);
+ this.streamShardHandle = Preconditions.checkNotNull(streamShardHandle);
+ this.lastProcessedSequenceNum = Preconditions.checkNotNull(lastProcessedSequenceNum);
}
- public KinesisStreamShardV2 getKinesisStreamShard() {
- return this.kinesisStreamShard;
+ public StreamShardMetadata getStreamShardMetadata() {
+ return this.streamShardMetadata;
}
public StreamShardHandle getStreamShardHandle() {
@@ -51,7 +61,7 @@ public class KinesisStreamShardState {
@Override
public String toString() {
return "KinesisStreamShardState{" +
- "kinesisStreamShard='" + kinesisStreamShard.toString() + "'" +
+ "streamShardMetadata='" + streamShardMetadata.toString() + "'" +
", streamShardHandle='" + streamShardHandle.toString() + "'" +
", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}";
}
@@ -68,13 +78,13 @@ public class KinesisStreamShardState {
KinesisStreamShardState other = (KinesisStreamShardState) obj;
- return kinesisStreamShard.equals(other.getKinesisStreamShard()) &&
+ return streamShardMetadata.equals(other.getStreamShardMetadata()) &&
streamShardHandle.equals(other.getStreamShardHandle()) &&
lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
}
@Override
public int hashCode() {
- return 37 * (kinesisStreamShard.hashCode() + streamShardHandle.hashCode() + lastProcessedSequenceNum.hashCode());
+ return 37 * (streamShardMetadata.hashCode() + streamShardHandle.hashCode() + lastProcessedSequenceNum.hashCode());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
deleted file mode 100644
index 71cb6fa..0000000
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
- * disintegrating from {@link com.amazonaws.services.kinesis.model.Shard} and its nested classes.
- */
-public class KinesisStreamShardV2 implements Serializable {
-
- private static final long serialVersionUID = 5134869582298563604L;
-
- private String streamName;
- private String shardId;
- private String parentShardId;
- private String adjacentParentShardId;
- private String startingHashKey;
- private String endingHashKey;
- private String startingSequenceNumber;
- private String endingSequenceNumber;
-
- public void setStreamName(String streamName) {
- this.streamName = streamName;
- }
-
- public void setShardId(String shardId) {
- this.shardId = shardId;
- }
-
- public void setParentShardId(String parentShardId) {
- this.parentShardId = parentShardId;
- }
-
- public void setAdjacentParentShardId(String adjacentParentShardId) {
- this.adjacentParentShardId = adjacentParentShardId;
- }
-
- public void setStartingHashKey(String startingHashKey) {
- this.startingHashKey = startingHashKey;
- }
-
- public void setEndingHashKey(String endingHashKey) {
- this.endingHashKey = endingHashKey;
- }
-
- public void setStartingSequenceNumber(String startingSequenceNumber) {
- this.startingSequenceNumber = startingSequenceNumber;
- }
-
- public void setEndingSequenceNumber(String endingSequenceNumber) {
- this.endingSequenceNumber = endingSequenceNumber;
- }
-
- public String getStreamName() {
- return this.streamName;
- }
-
- public String getShardId() {
- return this.shardId;
- }
-
- public String getParentShardId() {
- return this.parentShardId;
- }
-
- public String getAdjacentParentShardId() {
- return this.adjacentParentShardId;
- }
-
- public String getStartingHashKey() {
- return this.startingHashKey;
- }
-
- public String getEndingHashKey() {
- return this.endingHashKey;
- }
-
- public String getStartingSequenceNumber() {
- return this.startingSequenceNumber;
- }
-
- public String getEndingSequenceNumber() {
- return this.endingSequenceNumber;
- }
-
- @Override
- public String toString() {
- return "KinesisStreamShardV2{" +
- "streamName='" + streamName + "'" +
- ", shardId='" + shardId + "'" +
- ", parentShardId='" + parentShardId + "'" +
- ", adjacentParentShardId='" + adjacentParentShardId + "'" +
- ", startingHashKey='" + startingHashKey + "'" +
- ", endingHashKey='" + endingHashKey + "'" +
- ", startingSequenceNumber='" + startingSequenceNumber + "'" +
- ", endingSequenceNumber='" + endingSequenceNumber + "'}";
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof KinesisStreamShardV2)) {
- return false;
- }
-
- if (obj == this) {
- return true;
- }
-
- KinesisStreamShardV2 other = (KinesisStreamShardV2) obj;
-
- return streamName.equals(other.getStreamName()) &&
- shardId.equals(other.getShardId()) &&
- Objects.equals(parentShardId, other.getParentShardId()) &&
- Objects.equals(adjacentParentShardId, other.getAdjacentParentShardId()) &&
- Objects.equals(startingHashKey, other.getStartingHashKey()) &&
- Objects.equals(endingHashKey, other.getEndingHashKey()) &&
- Objects.equals(startingSequenceNumber, other.getStartingSequenceNumber()) &&
- Objects.equals(endingSequenceNumber, other.getEndingSequenceNumber());
- }
-
- @Override
- public int hashCode() {
- int hash = 17;
-
- if (streamName != null) {
- hash = 37 * hash + streamName.hashCode();
- }
- if (shardId != null) {
- hash = 37 * hash + shardId.hashCode();
- }
- if (parentShardId != null) {
- hash = 37 * hash + parentShardId.hashCode();
- }
- if (adjacentParentShardId != null) {
- hash = 37 * hash + adjacentParentShardId.hashCode();
- }
- if (startingHashKey != null) {
- hash = 37 * hash + startingHashKey.hashCode();
- }
- if (endingHashKey != null) {
- hash = 37 * hash + endingHashKey.hashCode();
- }
- if (startingSequenceNumber != null) {
- hash = 37 * hash + startingSequenceNumber.hashCode();
- }
- if (endingSequenceNumber != null) {
- hash = 37 * hash + endingSequenceNumber.hashCode();
- }
-
- return hash;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardMetadata.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardMetadata.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardMetadata.java
new file mode 100644
index 0000000..a158a8b
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardMetadata.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
+ * disintegrated from {@link com.amazonaws.services.kinesis.model.Shard} and its nested classes. The disintegration
+ * is required to avoid being locked-in to a specific AWS SDK version in order to maintain the consumer's state
+ * backwards compatibility.
+ */
+public class StreamShardMetadata implements Serializable {
+
+ private static final long serialVersionUID = 5134869582298563604L;
+
+ private String streamName;
+ private String shardId;
+ private String parentShardId;
+ private String adjacentParentShardId;
+ private String startingHashKey;
+ private String endingHashKey;
+ private String startingSequenceNumber;
+ private String endingSequenceNumber;
+
+ public void setStreamName(String streamName) {
+ this.streamName = streamName;
+ }
+
+ public void setShardId(String shardId) {
+ this.shardId = shardId;
+ }
+
+ public void setParentShardId(String parentShardId) {
+ this.parentShardId = parentShardId;
+ }
+
+ public void setAdjacentParentShardId(String adjacentParentShardId) {
+ this.adjacentParentShardId = adjacentParentShardId;
+ }
+
+ public void setStartingHashKey(String startingHashKey) {
+ this.startingHashKey = startingHashKey;
+ }
+
+ public void setEndingHashKey(String endingHashKey) {
+ this.endingHashKey = endingHashKey;
+ }
+
+ public void setStartingSequenceNumber(String startingSequenceNumber) {
+ this.startingSequenceNumber = startingSequenceNumber;
+ }
+
+ public void setEndingSequenceNumber(String endingSequenceNumber) {
+ this.endingSequenceNumber = endingSequenceNumber;
+ }
+
+ public String getStreamName() {
+ return this.streamName;
+ }
+
+ public String getShardId() {
+ return this.shardId;
+ }
+
+ public String getParentShardId() {
+ return this.parentShardId;
+ }
+
+ public String getAdjacentParentShardId() {
+ return this.adjacentParentShardId;
+ }
+
+ public String getStartingHashKey() {
+ return this.startingHashKey;
+ }
+
+ public String getEndingHashKey() {
+ return this.endingHashKey;
+ }
+
+ public String getStartingSequenceNumber() {
+ return this.startingSequenceNumber;
+ }
+
+ public String getEndingSequenceNumber() {
+ return this.endingSequenceNumber;
+ }
+
+ @Override
+ public String toString() {
+ return "StreamShardMetadata{" +
+ "streamName='" + streamName + "'" +
+ ", shardId='" + shardId + "'" +
+ ", parentShardId='" + parentShardId + "'" +
+ ", adjacentParentShardId='" + adjacentParentShardId + "'" +
+ ", startingHashKey='" + startingHashKey + "'" +
+ ", endingHashKey='" + endingHashKey + "'" +
+ ", startingSequenceNumber='" + startingSequenceNumber + "'" +
+ ", endingSequenceNumber='" + endingSequenceNumber + "'}";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof StreamShardMetadata)) {
+ return false;
+ }
+
+ if (obj == this) {
+ return true;
+ }
+
+ StreamShardMetadata other = (StreamShardMetadata) obj;
+
+ return streamName.equals(other.getStreamName()) &&
+ shardId.equals(other.getShardId()) &&
+ Objects.equals(parentShardId, other.getParentShardId()) &&
+ Objects.equals(adjacentParentShardId, other.getAdjacentParentShardId()) &&
+ Objects.equals(startingHashKey, other.getStartingHashKey()) &&
+ Objects.equals(endingHashKey, other.getEndingHashKey()) &&
+ Objects.equals(startingSequenceNumber, other.getStartingSequenceNumber()) &&
+ Objects.equals(endingSequenceNumber, other.getEndingSequenceNumber());
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 17;
+
+ if (streamName != null) {
+ hash = 37 * hash + streamName.hashCode();
+ }
+ if (shardId != null) {
+ hash = 37 * hash + shardId.hashCode();
+ }
+ if (parentShardId != null) {
+ hash = 37 * hash + parentShardId.hashCode();
+ }
+ if (adjacentParentShardId != null) {
+ hash = 37 * hash + adjacentParentShardId.hashCode();
+ }
+ if (startingHashKey != null) {
+ hash = 37 * hash + startingHashKey.hashCode();
+ }
+ if (endingHashKey != null) {
+ hash = 37 * hash + endingHashKey.hashCode();
+ }
+ if (startingSequenceNumber != null) {
+ hash = 37 * hash + startingSequenceNumber.hashCode();
+ }
+ if (endingSequenceNumber != null) {
+ hash = 37 * hash + endingSequenceNumber.hashCode();
+ }
+
+ return hash;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index d2af6ad..e24a411 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
@@ -102,8 +102,8 @@ public class FlinkKinesisConsumerMigrationTest {
testHarness.open();
// the expected state in "kafka-consumer-migration-test-flink1.1-snapshot"
- final HashMap<KinesisStreamShardV2, SequenceNumber> expectedState = new HashMap<>();
- expectedState.put(FlinkKinesisConsumer.createKinesisStreamShardV2(new KinesisStreamShard("fakeStream1",
+ final HashMap<StreamShardMetadata, SequenceNumber> expectedState = new HashMap<>();
+ expectedState.put(KinesisStreamShard.convertToStreamShardMetadata(new KinesisStreamShard("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("987654321"));
http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 760858a..186dfa6 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -41,7 +41,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
@@ -543,26 +543,26 @@ public class FlinkKinesisConsumerTest {
config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- List<Tuple2<KinesisStreamShardV2, SequenceNumber>> globalUnionState = new ArrayList<>(4);
+ List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
globalUnionState.add(Tuple2.of(
- KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+ KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
- KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+ KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
- KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+ KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
- KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+ KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
new SequenceNumber("1")));
- TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
- for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : globalUnionState) {
+ TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
+ for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
listState.add(state);
}
@@ -606,23 +606,23 @@ public class FlinkKinesisConsumerTest {
config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
- ArrayList<Tuple2<KinesisStreamShardV2, SequenceNumber>> initialState = new ArrayList<>(1);
+ ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = new ArrayList<>(1);
initialState.add(Tuple2.of(
- KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+ KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("1")));
- ArrayList<Tuple2<KinesisStreamShardV2, SequenceNumber>> expectedStateSnapshot = new ArrayList<>(3);
+ ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> expectedStateSnapshot = new ArrayList<>(3);
expectedStateSnapshot.add(Tuple2.of(
- KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+ KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("12")));
expectedStateSnapshot.add(Tuple2.of(
- KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+ KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
new SequenceNumber("11")));
expectedStateSnapshot.add(Tuple2.of(
- KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+ KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
new SequenceNumber("31")));
@@ -630,8 +630,8 @@ public class FlinkKinesisConsumerTest {
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
- TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
- for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : initialState) {
+ TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
+ for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) {
listState.add(state);
}
@@ -646,8 +646,8 @@ public class FlinkKinesisConsumerTest {
// mock a running fetcher and its state for snapshot
// ----------------------------------------------------------------------
- HashMap<KinesisStreamShardV2, SequenceNumber> stateSnapshot = new HashMap<>();
- for (Tuple2<KinesisStreamShardV2, SequenceNumber> tuple : expectedStateSnapshot) {
+ HashMap<StreamShardMetadata, SequenceNumber> stateSnapshot = new HashMap<>();
+ for (Tuple2<StreamShardMetadata, SequenceNumber> tuple : expectedStateSnapshot) {
stateSnapshot.put(tuple.f0, tuple.f1);
}
@@ -674,15 +674,15 @@ public class FlinkKinesisConsumerTest {
assertEquals(true, listState.clearCalled);
assertEquals(3, listState.getList().size());
- for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : initialState) {
- for (Tuple2<KinesisStreamShardV2, SequenceNumber> currentState : listState.getList()) {
+ for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) {
+ for (Tuple2<StreamShardMetadata, SequenceNumber> currentState : listState.getList()) {
assertNotEquals(state, currentState);
}
}
- for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : expectedStateSnapshot) {
+ for (Tuple2<StreamShardMetadata, SequenceNumber> state : expectedStateSnapshot) {
boolean hasOneIsSame = false;
- for (Tuple2<KinesisStreamShardV2, SequenceNumber> currentState : listState.getList()) {
+ for (Tuple2<StreamShardMetadata, SequenceNumber> currentState : listState.getList()) {
hasOneIsSame = hasOneIsSame || state.equals(currentState);
}
assertEquals(true, hasOneIsSame);
@@ -736,7 +736,7 @@ public class FlinkKinesisConsumerTest {
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
- new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+ new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
@@ -755,9 +755,9 @@ public class FlinkKinesisConsumerTest {
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
- TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+ TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
- listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
+ listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -793,7 +793,7 @@ public class FlinkKinesisConsumerTest {
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
- new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+ new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
@@ -814,12 +814,12 @@ public class FlinkKinesisConsumerTest {
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
- TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+ TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
- listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
+ listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredStateForOthers.entrySet()) {
- listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
+ listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -856,13 +856,13 @@ public class FlinkKinesisConsumerTest {
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet()) {
// should never get restored state not belonging to itself
Mockito.verify(mockedFetcher, never()).registerNewSubscribedShardState(
- new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+ new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
// should get restored state belonging to itself
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
- new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+ new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
@@ -910,9 +910,9 @@ public class FlinkKinesisConsumerTest {
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
- TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+ TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
- listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
+ listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -953,13 +953,13 @@ public class FlinkKinesisConsumerTest {
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
- new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+ new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
@Test
- public void testCreateFunctionToConvertBetweenKinesisStreamShardAndKinesisStreamShardV2() {
+ public void testLegacyKinesisStreamShardToStreamShardMetadataConversion() {
String streamName = "fakeStream1";
String shardId = "shard-000001";
String parentShardId = "shard-000002";
@@ -969,15 +969,15 @@ public class FlinkKinesisConsumerTest {
String startingSequenceNumber = "seq-0000021";
String endingSequenceNumber = "seq-00000031";
- KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
- kinesisStreamShardV2.setStreamName(streamName);
- kinesisStreamShardV2.setShardId(shardId);
- kinesisStreamShardV2.setParentShardId(parentShardId);
- kinesisStreamShardV2.setAdjacentParentShardId(adjacentParentShardId);
- kinesisStreamShardV2.setStartingHashKey(startingHashKey);
- kinesisStreamShardV2.setEndingHashKey(endingHashKey);
- kinesisStreamShardV2.setStartingSequenceNumber(startingSequenceNumber);
- kinesisStreamShardV2.setEndingSequenceNumber(endingSequenceNumber);
+ StreamShardMetadata streamShardMetadata = new StreamShardMetadata();
+ streamShardMetadata.setStreamName(streamName);
+ streamShardMetadata.setShardId(shardId);
+ streamShardMetadata.setParentShardId(parentShardId);
+ streamShardMetadata.setAdjacentParentShardId(adjacentParentShardId);
+ streamShardMetadata.setStartingHashKey(startingHashKey);
+ streamShardMetadata.setEndingHashKey(endingHashKey);
+ streamShardMetadata.setStartingSequenceNumber(startingSequenceNumber);
+ streamShardMetadata.setEndingSequenceNumber(endingSequenceNumber);
Shard shard = new Shard()
.withShardId(shardId)
@@ -991,12 +991,12 @@ public class FlinkKinesisConsumerTest {
.withEndingSequenceNumber(endingSequenceNumber));
KinesisStreamShard kinesisStreamShard = new KinesisStreamShard(streamName, shard);
- assertEquals(kinesisStreamShardV2, FlinkKinesisConsumer.createKinesisStreamShardV2(kinesisStreamShard));
+ assertEquals(streamShardMetadata, KinesisStreamShard.convertToStreamShardMetadata(kinesisStreamShard));
}
@Test
- public void testKinesisStreamShardV2WillUsePojoSerializer() {
- TypeInformation<KinesisStreamShardV2> typeInformation = TypeInformation.of(KinesisStreamShardV2.class);
+ public void testStreamShardMetadataSerializedUsingPojoSerializer() {
+ TypeInformation<StreamShardMetadata> typeInformation = TypeInformation.of(StreamShardMetadata.class);
assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 7c36945..4fb6dd4 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
@@ -205,7 +205,7 @@ public class KinesisDataFetcherTest {
for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
fetcher.registerNewSubscribedShardState(
- new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+ new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
}
@@ -296,7 +296,7 @@ public class KinesisDataFetcherTest {
for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
fetcher.registerNewSubscribedShardState(
- new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+ new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
}
@@ -391,7 +391,7 @@ public class KinesisDataFetcherTest {
for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
fetcher.registerNewSubscribedShardState(
- new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+ new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
}
@@ -487,7 +487,7 @@ public class KinesisDataFetcherTest {
for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
fetcher.registerNewSubscribedShardState(
- new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+ new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
}
@@ -521,7 +521,7 @@ public class KinesisDataFetcherTest {
}
@Test
- public void testCreateFunctionToConvertBetweenKinesisStreamShardV2AndStreamShardHandle() {
+ public void testStreamShardMetadataAndHandleConversion() {
String streamName = "fakeStream1";
String shardId = "shard-000001";
String parentShardId = "shard-000002";
@@ -531,7 +531,7 @@ public class KinesisDataFetcherTest {
String startingSequenceNumber = "seq-0000021";
String endingSequenceNumber = "seq-00000031";
- KinesisStreamShardV2 kinesisStreamShard = new KinesisStreamShardV2();
+ StreamShardMetadata kinesisStreamShard = new StreamShardMetadata();
kinesisStreamShard.setStreamName(streamName);
kinesisStreamShard.setShardId(shardId);
kinesisStreamShard.setParentShardId(parentShardId);
@@ -553,8 +553,8 @@ public class KinesisDataFetcherTest {
.withEndingSequenceNumber(endingSequenceNumber));
StreamShardHandle streamShardHandle = new StreamShardHandle(streamName, shard);
- assertEquals(kinesisStreamShard, KinesisDataFetcher.createKinesisStreamShardV2(streamShardHandle));
- assertEquals(streamShardHandle, KinesisDataFetcher.createStreamShardHandle(kinesisStreamShard));
+ assertEquals(kinesisStreamShard, KinesisDataFetcher.convertToStreamShardMetadata(streamShardHandle));
+ assertEquals(streamShardHandle, KinesisDataFetcher.convertToStreamShardHandle(kinesisStreamShard));
}
private static class DummyFlinkKafkaConsumer<T> extends FlinkKinesisConsumer<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 4e06329..b22ba0c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -54,7 +54,7 @@ public class ShardConsumerTest {
LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
- new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(fakeToBeConsumedShard),
+ new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
TestableKinesisDataFetcher fetcher =
@@ -93,7 +93,7 @@ public class ShardConsumerTest {
LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
- new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(fakeToBeConsumedShard),
+ new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
TestableKinesisDataFetcher fetcher =