You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/26 08:28:00 UTC

[8/8] flink git commit: [FLINK-6653] [kinesis] Improvements to removal of AWS's Shard class in checkpoints

[FLINK-6653] [kinesis] Improvements to removal of AWS's Shard class in checkpoints

This closes #3994.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2597e7e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2597e7e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2597e7e1

Branch: refs/heads/master
Commit: 2597e7e1803da66190ff545e705a6a4e6a6f76a2
Parents: 913be2f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 26 15:56:54 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:27:19 2017 +0800

----------------------------------------------------------------------
 .../kinesis/FlinkKinesisConsumer.java           |  55 ++----
 .../kinesis/internals/KinesisDataFetcher.java   |  74 ++++----
 .../kinesis/model/KinesisStreamShard.java       |  27 +++
 .../kinesis/model/KinesisStreamShardState.java  |  30 ++--
 .../kinesis/model/KinesisStreamShardV2.java     | 171 ------------------
 .../kinesis/model/StreamShardMetadata.java      | 173 +++++++++++++++++++
 .../FlinkKinesisConsumerMigrationTest.java      |   6 +-
 .../kinesis/FlinkKinesisConsumerTest.java       |  94 +++++-----
 .../internals/KinesisDataFetcherTest.java       |  18 +-
 .../kinesis/internals/ShardConsumerTest.java    |   4 +-
 10 files changed, 333 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index b7f5506..ea76ccc 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardSta
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
@@ -100,7 +100,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	private transient KinesisDataFetcher<T> fetcher;
 
 	/** The sequence numbers to restore to upon restore from failure */
-	private transient HashMap<KinesisStreamShardV2, SequenceNumber> sequenceNumsToRestore;
+	private transient HashMap<StreamShardMetadata, SequenceNumber> sequenceNumsToRestore;
 
 	private volatile boolean running = true;
 
@@ -111,7 +111,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	/** State name to access shard sequence number states; cannot be changed */
 	private static final String sequenceNumsStateStoreName = "Kinesis-Stream-Shard-State";
 
-	private transient ListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> sequenceNumsStateForCheckpoint;
+	private transient ListState<Tuple2<StreamShardMetadata, SequenceNumber>> sequenceNumsStateForCheckpoint;
 
 	// ------------------------------------------------------------------------
 	//  Constructors
@@ -202,7 +202,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();
 
 		for (StreamShardHandle shard : allShards) {
-			KinesisStreamShardV2 kinesisStreamShard = KinesisDataFetcher.createKinesisStreamShardV2(shard);
+			StreamShardMetadata kinesisStreamShard = KinesisDataFetcher.convertToStreamShardMetadata(shard);
 			if (sequenceNumsToRestore != null) {
 				if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
 					// if the shard was already seen and is contained in the state,
@@ -298,8 +298,8 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 
 	@Override
 	public void initializeState(FunctionInitializationContext context) throws Exception {
-		TypeInformation<Tuple2<KinesisStreamShardV2, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
-			TypeInformation.of(KinesisStreamShardV2.class),
+		TypeInformation<Tuple2<StreamShardMetadata, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
+			TypeInformation.of(StreamShardMetadata.class),
 			TypeInformation.of(SequenceNumber.class));
 
 		sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
@@ -308,7 +308,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		if (context.isRestored()) {
 			if (sequenceNumsToRestore == null) {
 				sequenceNumsToRestore = new HashMap<>();
-				for (Tuple2<KinesisStreamShardV2, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
+				for (Tuple2<StreamShardMetadata, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
 					sequenceNumsToRestore.put(kinesisSequenceNumber.f0, kinesisSequenceNumber.f1);
 				}
 
@@ -333,12 +333,12 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 
 			if (fetcher == null) {
 				if (sequenceNumsToRestore != null) {
-					for (Map.Entry<KinesisStreamShardV2, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
+					for (Map.Entry<StreamShardMetadata, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
 						// sequenceNumsToRestore is the restored global union state;
 						// should only snapshot shards that actually belong to us
 
 						if (KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(
-								KinesisDataFetcher.createStreamShardHandle(entry.getKey()),
+								KinesisDataFetcher.convertToStreamShardHandle(entry.getKey()),
 								getRuntimeContext().getNumberOfParallelSubtasks(),
 								getRuntimeContext().getIndexOfThisSubtask())) {
 
@@ -347,14 +347,14 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 					}
 				}
 			} else {
-				HashMap<KinesisStreamShardV2, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();
+				HashMap<StreamShardMetadata, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();
 
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
 						lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
 				}
 
-				for (Map.Entry<KinesisStreamShardV2, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
+				for (Map.Entry<StreamShardMetadata, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
 					sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
 				}
 			}
@@ -370,8 +370,10 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 			sequenceNumsToRestore = null;
 		} else {
 			sequenceNumsToRestore = new HashMap<>();
-			for (Map.Entry<KinesisStreamShard, SequenceNumber> kv: restoredState.entrySet()) {
-				sequenceNumsToRestore.put(createKinesisStreamShardV2(kv.getKey()), kv.getValue());
+			for (Map.Entry<KinesisStreamShard, SequenceNumber> stateEntry : restoredState.entrySet()) {
+				sequenceNumsToRestore.put(
+						KinesisStreamShard.convertToStreamShardMetadata(stateEntry.getKey()),
+						stateEntry.getValue());
 			}
 		}
 	}
@@ -388,32 +390,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	}
 
 	@VisibleForTesting
-	HashMap<KinesisStreamShardV2, SequenceNumber> getRestoredState() {
+	HashMap<StreamShardMetadata, SequenceNumber> getRestoredState() {
 		return sequenceNumsToRestore;
 	}
-
-	/**
-	 * Utility function to convert {@link KinesisStreamShard} into {@link KinesisStreamShardV2}
-	 *
-	 * @param kinesisStreamShard the {@link KinesisStreamShard} to be converted
-	 * @return a {@link KinesisStreamShardV2} object
-	 */
-	public static KinesisStreamShardV2 createKinesisStreamShardV2(KinesisStreamShard kinesisStreamShard) {
-		KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
-
-		kinesisStreamShardV2.setStreamName(kinesisStreamShard.getStreamName());
-		kinesisStreamShardV2.setShardId(kinesisStreamShard.getShard().getShardId());
-		kinesisStreamShardV2.setParentShardId(kinesisStreamShard.getShard().getParentShardId());
-		kinesisStreamShardV2.setAdjacentParentShardId(kinesisStreamShard.getShard().getAdjacentParentShardId());
-		if (kinesisStreamShard.getShard().getHashKeyRange() != null) {
-			kinesisStreamShardV2.setStartingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getStartingHashKey());
-			kinesisStreamShardV2.setEndingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getEndingHashKey());
-		}
-		if (kinesisStreamShard.getShard().getSequenceNumberRange() != null) {
-			kinesisStreamShardV2.setStartingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getStartingSequenceNumber());
-			kinesisStreamShardV2.setEndingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getEndingSequenceNumber());
-		}
-
-		return kinesisStreamShardV2;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index b0dceec..11ac6d4 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -303,7 +303,7 @@ public class KinesisDataFetcher<T> {
 				// since there may be delay in discovering a new shard, all new shards due to
 				// resharding should be read starting from the earliest record possible
 				KinesisStreamShardState newShardState =
-					new KinesisStreamShardState(createKinesisStreamShardV2(shard), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
+					new KinesisStreamShardState(convertToStreamShardMetadata(shard), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
 				int newStateIndex = registerNewSubscribedShardState(newShardState);
 
 				if (LOG.isInfoEnabled()) {
@@ -353,13 +353,13 @@ public class KinesisDataFetcher<T> {
 	 *
 	 * @return state snapshot
 	 */
-	public HashMap<KinesisStreamShardV2, SequenceNumber> snapshotState() {
+	public HashMap<StreamShardMetadata, SequenceNumber> snapshotState() {
 		// this method assumes that the checkpoint lock is held
 		assert Thread.holdsLock(checkpointLock);
 
-		HashMap<KinesisStreamShardV2, SequenceNumber> stateSnapshot = new HashMap<>();
+		HashMap<StreamShardMetadata, SequenceNumber> stateSnapshot = new HashMap<>();
 		for (KinesisStreamShardState shardWithState : subscribedShardsState) {
-			stateSnapshot.put(shardWithState.getKinesisStreamShard(), shardWithState.getLastProcessedSequenceNum());
+			stateSnapshot.put(shardWithState.getStreamShardMetadata(), shardWithState.getLastProcessedSequenceNum());
 		}
 		return stateSnapshot;
 	}
@@ -588,56 +588,54 @@ public class KinesisDataFetcher<T> {
 	}
 
 	/**
-	 * Utility function to convert {@link StreamShardHandle} into {@link KinesisStreamShardV2}
+	 * Utility function to convert {@link StreamShardHandle} into {@link StreamShardMetadata}
 	 *
 	 * @param streamShardHandle the {@link StreamShardHandle} to be converted
-	 * @return a {@link KinesisStreamShardV2} object
+	 * @return a {@link StreamShardMetadata} object
 	 */
-	public static KinesisStreamShardV2 createKinesisStreamShardV2(StreamShardHandle streamShardHandle) {
-		KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
+	public static StreamShardMetadata convertToStreamShardMetadata(StreamShardHandle streamShardHandle) {
+		StreamShardMetadata streamShardMetadata = new StreamShardMetadata();
+
+		streamShardMetadata.setStreamName(streamShardHandle.getStreamName());
+		streamShardMetadata.setShardId(streamShardHandle.getShard().getShardId());
+		streamShardMetadata.setParentShardId(streamShardHandle.getShard().getParentShardId());
+		streamShardMetadata.setAdjacentParentShardId(streamShardHandle.getShard().getAdjacentParentShardId());
 
-		kinesisStreamShardV2.setStreamName(streamShardHandle.getStreamName());
-		kinesisStreamShardV2.setShardId(streamShardHandle.getShard().getShardId());
-		kinesisStreamShardV2.setParentShardId(streamShardHandle.getShard().getParentShardId());
-		kinesisStreamShardV2.setAdjacentParentShardId(streamShardHandle.getShard().getAdjacentParentShardId());
 		if (streamShardHandle.getShard().getHashKeyRange() != null) {
-			kinesisStreamShardV2.setStartingHashKey(streamShardHandle.getShard().getHashKeyRange().getStartingHashKey());
-			kinesisStreamShardV2.setEndingHashKey(streamShardHandle.getShard().getHashKeyRange().getEndingHashKey());
+			streamShardMetadata.setStartingHashKey(streamShardHandle.getShard().getHashKeyRange().getStartingHashKey());
+			streamShardMetadata.setEndingHashKey(streamShardHandle.getShard().getHashKeyRange().getEndingHashKey());
 		}
+
 		if (streamShardHandle.getShard().getSequenceNumberRange() != null) {
-			kinesisStreamShardV2.setStartingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getStartingSequenceNumber());
-			kinesisStreamShardV2.setEndingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getEndingSequenceNumber());
+			streamShardMetadata.setStartingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getStartingSequenceNumber());
+			streamShardMetadata.setEndingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getEndingSequenceNumber());
 		}
 
-		return kinesisStreamShardV2;
+		return streamShardMetadata;
 	}
 
 	/**
-	 * Utility function to convert {@link KinesisStreamShardV2} into {@link StreamShardHandle}
+	 * Utility function to convert {@link StreamShardMetadata} into {@link StreamShardHandle}
 	 *
-	 * @param kinesisStreamShard the {@link KinesisStreamShardV2} to be converted
+	 * @param streamShardMetadata the {@link StreamShardMetadata} to be converted
 	 * @return a {@link StreamShardHandle} object
 	 */
-	public static StreamShardHandle createStreamShardHandle(KinesisStreamShardV2 kinesisStreamShard) {
+	public static StreamShardHandle convertToStreamShardHandle(StreamShardMetadata streamShardMetadata) {
 		Shard shard = new Shard();
-		shard.withShardId(kinesisStreamShard.getShardId());
-		shard.withParentShardId(kinesisStreamShard.getParentShardId());
-		shard.withAdjacentParentShardId(kinesisStreamShard.getAdjacentParentShardId());
-
-		if (kinesisStreamShard.getStartingHashKey() != null && kinesisStreamShard.getEndingHashKey() != null) {
-			HashKeyRange hashKeyRange = new HashKeyRange();
-			hashKeyRange.withStartingHashKey(kinesisStreamShard.getStartingHashKey());
-			hashKeyRange.withEndingHashKey(kinesisStreamShard.getEndingHashKey());
-			shard.withHashKeyRange(hashKeyRange);
-		}
+		shard.withShardId(streamShardMetadata.getShardId());
+		shard.withParentShardId(streamShardMetadata.getParentShardId());
+		shard.withAdjacentParentShardId(streamShardMetadata.getAdjacentParentShardId());
 
-		if (kinesisStreamShard.getStartingSequenceNumber() != null && kinesisStreamShard.getEndingSequenceNumber() != null) {
-			SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
-			sequenceNumberRange.withStartingSequenceNumber(kinesisStreamShard.getStartingSequenceNumber());
-			sequenceNumberRange.withEndingSequenceNumber(kinesisStreamShard.getEndingSequenceNumber());
-			shard.withSequenceNumberRange(sequenceNumberRange);
-		}
+		HashKeyRange hashKeyRange = new HashKeyRange();
+		hashKeyRange.withStartingHashKey(streamShardMetadata.getStartingHashKey());
+		hashKeyRange.withEndingHashKey(streamShardMetadata.getEndingHashKey());
+		shard.withHashKeyRange(hashKeyRange);
+
+		SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
+		sequenceNumberRange.withStartingSequenceNumber(streamShardMetadata.getStartingSequenceNumber());
+		sequenceNumberRange.withEndingSequenceNumber(streamShardMetadata.getEndingSequenceNumber());
+		shard.withSequenceNumberRange(sequenceNumberRange);
 
-		return new StreamShardHandle(kinesisStreamShard.getStreamName(), shard);
+		return new StreamShardHandle(streamShardMetadata.getStreamName(), shard);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
index f3dcfe1..592e30d 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
@@ -129,4 +129,31 @@ public class KinesisStreamShard implements Serializable {
 		if (shardId == null) { return false; }
 		return shardId.matches("^shardId-\\d{12}");
 	}
+
+	/**
+	 * Utility function to convert {@link KinesisStreamShard} into the new {@link StreamShardMetadata} model.
+	 *
+	 * @param kinesisStreamShard the {@link KinesisStreamShard} to be converted
+	 * @return the converted {@link StreamShardMetadata}
+	 */
+	public static StreamShardMetadata convertToStreamShardMetadata(KinesisStreamShard kinesisStreamShard) {
+		StreamShardMetadata streamShardMetadata = new StreamShardMetadata();
+
+		streamShardMetadata.setStreamName(kinesisStreamShard.getStreamName());
+		streamShardMetadata.setShardId(kinesisStreamShard.getShard().getShardId());
+		streamShardMetadata.setParentShardId(kinesisStreamShard.getShard().getParentShardId());
+		streamShardMetadata.setAdjacentParentShardId(kinesisStreamShard.getShard().getAdjacentParentShardId());
+
+		if (kinesisStreamShard.getShard().getHashKeyRange() != null) {
+			streamShardMetadata.setStartingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getStartingHashKey());
+			streamShardMetadata.setEndingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getEndingHashKey());
+		}
+		
+		if (kinesisStreamShard.getShard().getSequenceNumberRange() != null) {
+			streamShardMetadata.setStartingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getStartingSequenceNumber());
+			streamShardMetadata.setEndingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getEndingSequenceNumber());
+		}
+
+		return streamShardMetadata;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
index e68129d..4b1cc1c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
@@ -17,23 +17,33 @@
 
 package org.apache.flink.streaming.connectors.kinesis.model;
 
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.util.Preconditions;
+
 /**
  * A wrapper class that bundles a {@link StreamShardHandle} with its last processed sequence number.
  */
 public class KinesisStreamShardState {
 
-	private KinesisStreamShardV2 kinesisStreamShard;
+	/** A handle object that wraps the actual {@link Shard} instance and stream name. */
 	private StreamShardHandle streamShardHandle;
+
+	/** The checkpointed state for each Kinesis stream shard. */
+	private StreamShardMetadata streamShardMetadata;
 	private SequenceNumber lastProcessedSequenceNum;
 
-	public KinesisStreamShardState(KinesisStreamShardV2 kinesisStreamShard, StreamShardHandle streamShardHandle, SequenceNumber lastProcessedSequenceNum) {
-		this.kinesisStreamShard = kinesisStreamShard;
-		this.streamShardHandle = streamShardHandle;
-		this.lastProcessedSequenceNum = lastProcessedSequenceNum;
+	public KinesisStreamShardState(
+			StreamShardMetadata streamShardMetadata,
+			StreamShardHandle streamShardHandle,
+			SequenceNumber lastProcessedSequenceNum) {
+
+		this.streamShardMetadata = Preconditions.checkNotNull(streamShardMetadata);
+		this.streamShardHandle = Preconditions.checkNotNull(streamShardHandle);
+		this.lastProcessedSequenceNum = Preconditions.checkNotNull(lastProcessedSequenceNum);
 	}
 
-	public KinesisStreamShardV2 getKinesisStreamShard() {
-		return this.kinesisStreamShard;
+	public StreamShardMetadata getStreamShardMetadata() {
+		return this.streamShardMetadata;
 	}
 
 	public StreamShardHandle getStreamShardHandle() {
@@ -51,7 +61,7 @@ public class KinesisStreamShardState {
 	@Override
 	public String toString() {
 		return "KinesisStreamShardState{" +
-			"kinesisStreamShard='" + kinesisStreamShard.toString() + "'" +
+			"streamShardMetadata='" + streamShardMetadata.toString() + "'" +
 			", streamShardHandle='" + streamShardHandle.toString() + "'" +
 			", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}";
 	}
@@ -68,13 +78,13 @@ public class KinesisStreamShardState {
 
 		KinesisStreamShardState other = (KinesisStreamShardState) obj;
 
-		return kinesisStreamShard.equals(other.getKinesisStreamShard()) &&
+		return streamShardMetadata.equals(other.getStreamShardMetadata()) &&
 			streamShardHandle.equals(other.getStreamShardHandle()) &&
 			lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
 	}
 
 	@Override
 	public int hashCode() {
-		return 37 * (kinesisStreamShard.hashCode() + streamShardHandle.hashCode() + lastProcessedSequenceNum.hashCode());
+		return 37 * (streamShardMetadata.hashCode() + streamShardHandle.hashCode() + lastProcessedSequenceNum.hashCode());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
deleted file mode 100644
index 71cb6fa..0000000
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
- * disintegrating from {@link com.amazonaws.services.kinesis.model.Shard} and its nested classes.
- */
-public class KinesisStreamShardV2 implements Serializable {
-
-	private static final long serialVersionUID = 5134869582298563604L;
-
-	private String streamName;
-	private String shardId;
-	private String parentShardId;
-	private String adjacentParentShardId;
-	private String startingHashKey;
-	private String endingHashKey;
-	private String startingSequenceNumber;
-	private String endingSequenceNumber;
-
-	public void setStreamName(String streamName) {
-		this.streamName = streamName;
-	}
-
-	public void setShardId(String shardId) {
-		this.shardId = shardId;
-	}
-
-	public void setParentShardId(String parentShardId) {
-		this.parentShardId = parentShardId;
-	}
-
-	public void setAdjacentParentShardId(String adjacentParentShardId) {
-		this.adjacentParentShardId = adjacentParentShardId;
-	}
-
-	public void setStartingHashKey(String startingHashKey) {
-		this.startingHashKey = startingHashKey;
-	}
-
-	public void setEndingHashKey(String endingHashKey) {
-		this.endingHashKey = endingHashKey;
-	}
-
-	public void setStartingSequenceNumber(String startingSequenceNumber) {
-		this.startingSequenceNumber = startingSequenceNumber;
-	}
-
-	public void setEndingSequenceNumber(String endingSequenceNumber) {
-		this.endingSequenceNumber = endingSequenceNumber;
-	}
-
-	public String getStreamName() {
-		return this.streamName;
-	}
-
-	public String getShardId() {
-		return this.shardId;
-	}
-
-	public String getParentShardId() {
-		return this.parentShardId;
-	}
-
-	public String getAdjacentParentShardId() {
-		return this.adjacentParentShardId;
-	}
-
-	public String getStartingHashKey() {
-		return this.startingHashKey;
-	}
-
-	public String getEndingHashKey() {
-		return this.endingHashKey;
-	}
-
-	public String getStartingSequenceNumber() {
-		return this.startingSequenceNumber;
-	}
-
-	public String getEndingSequenceNumber() {
-		return this.endingSequenceNumber;
-	}
-
-	@Override
-	public String toString() {
-		return "KinesisStreamShardV2{" +
-			"streamName='" + streamName + "'" +
-			", shardId='" + shardId + "'" +
-			", parentShardId='" + parentShardId + "'" +
-			", adjacentParentShardId='" + adjacentParentShardId + "'" +
-			", startingHashKey='" + startingHashKey + "'" +
-			", endingHashKey='" + endingHashKey + "'" +
-			", startingSequenceNumber='" + startingSequenceNumber + "'" +
-			", endingSequenceNumber='" + endingSequenceNumber + "'}";
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (!(obj instanceof KinesisStreamShardV2)) {
-			return false;
-		}
-
-		if (obj == this) {
-			return true;
-		}
-
-		KinesisStreamShardV2 other = (KinesisStreamShardV2) obj;
-
-		return streamName.equals(other.getStreamName()) &&
-			shardId.equals(other.getShardId()) &&
-			Objects.equals(parentShardId, other.getParentShardId()) &&
-			Objects.equals(adjacentParentShardId, other.getAdjacentParentShardId()) &&
-			Objects.equals(startingHashKey, other.getStartingHashKey()) &&
-			Objects.equals(endingHashKey, other.getEndingHashKey()) &&
-			Objects.equals(startingSequenceNumber, other.getStartingSequenceNumber()) &&
-			Objects.equals(endingSequenceNumber, other.getEndingSequenceNumber());
-	}
-
-	@Override
-	public int hashCode() {
-		int hash = 17;
-
-		if (streamName != null) {
-			hash = 37 * hash + streamName.hashCode();
-		}
-		if (shardId != null) {
-			hash = 37 * hash + shardId.hashCode();
-		}
-		if (parentShardId != null) {
-			hash = 37 * hash + parentShardId.hashCode();
-		}
-		if (adjacentParentShardId != null) {
-			hash = 37 * hash + adjacentParentShardId.hashCode();
-		}
-		if (startingHashKey != null) {
-			hash = 37 * hash + startingHashKey.hashCode();
-		}
-		if (endingHashKey != null) {
-			hash = 37 * hash + endingHashKey.hashCode();
-		}
-		if (startingSequenceNumber != null) {
-			hash = 37 * hash + startingSequenceNumber.hashCode();
-		}
-		if (endingSequenceNumber != null) {
-			hash = 37 * hash + endingSequenceNumber.hashCode();
-		}
-
-		return hash;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardMetadata.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardMetadata.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardMetadata.java
new file mode 100644
index 0000000..a158a8b
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardMetadata.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
+ * disintegrated from {@link com.amazonaws.services.kinesis.model.Shard} and its nested classes. The disintegration
+ * is required to avoid being locked-in to a specific AWS SDK version in order to maintain the consumer's state
+ * backwards compatibility.
+ */
+public class StreamShardMetadata implements Serializable {
+
+	private static final long serialVersionUID = 5134869582298563604L;
+
+	private String streamName;
+	private String shardId;
+	private String parentShardId;
+	private String adjacentParentShardId;
+	private String startingHashKey;
+	private String endingHashKey;
+	private String startingSequenceNumber;
+	private String endingSequenceNumber;
+
+	public void setStreamName(String streamName) {
+		this.streamName = streamName;
+	}
+
+	public void setShardId(String shardId) {
+		this.shardId = shardId;
+	}
+
+	public void setParentShardId(String parentShardId) {
+		this.parentShardId = parentShardId;
+	}
+
+	public void setAdjacentParentShardId(String adjacentParentShardId) {
+		this.adjacentParentShardId = adjacentParentShardId;
+	}
+
+	public void setStartingHashKey(String startingHashKey) {
+		this.startingHashKey = startingHashKey;
+	}
+
+	public void setEndingHashKey(String endingHashKey) {
+		this.endingHashKey = endingHashKey;
+	}
+
+	public void setStartingSequenceNumber(String startingSequenceNumber) {
+		this.startingSequenceNumber = startingSequenceNumber;
+	}
+
+	public void setEndingSequenceNumber(String endingSequenceNumber) {
+		this.endingSequenceNumber = endingSequenceNumber;
+	}
+
+	public String getStreamName() {
+		return this.streamName;
+	}
+
+	public String getShardId() {
+		return this.shardId;
+	}
+
+	public String getParentShardId() {
+		return this.parentShardId;
+	}
+
+	public String getAdjacentParentShardId() {
+		return this.adjacentParentShardId;
+	}
+
+	public String getStartingHashKey() {
+		return this.startingHashKey;
+	}
+
+	public String getEndingHashKey() {
+		return this.endingHashKey;
+	}
+
+	public String getStartingSequenceNumber() {
+		return this.startingSequenceNumber;
+	}
+
+	public String getEndingSequenceNumber() {
+		return this.endingSequenceNumber;
+	}
+
+	@Override
+	public String toString() {
+		return "StreamShardMetadata{" +
+			"streamName='" + streamName + "'" +
+			", shardId='" + shardId + "'" +
+			", parentShardId='" + parentShardId + "'" +
+			", adjacentParentShardId='" + adjacentParentShardId + "'" +
+			", startingHashKey='" + startingHashKey + "'" +
+			", endingHashKey='" + endingHashKey + "'" +
+			", startingSequenceNumber='" + startingSequenceNumber + "'" +
+			", endingSequenceNumber='" + endingSequenceNumber + "'}";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (!(obj instanceof StreamShardMetadata)) {
+			return false;
+		}
+
+		if (obj == this) {
+			return true;
+		}
+
+		StreamShardMetadata other = (StreamShardMetadata) obj;
+
+		return streamName.equals(other.getStreamName()) &&
+			shardId.equals(other.getShardId()) &&
+			Objects.equals(parentShardId, other.getParentShardId()) &&
+			Objects.equals(adjacentParentShardId, other.getAdjacentParentShardId()) &&
+			Objects.equals(startingHashKey, other.getStartingHashKey()) &&
+			Objects.equals(endingHashKey, other.getEndingHashKey()) &&
+			Objects.equals(startingSequenceNumber, other.getStartingSequenceNumber()) &&
+			Objects.equals(endingSequenceNumber, other.getEndingSequenceNumber());
+	}
+
+	@Override
+	public int hashCode() {
+		int hash = 17;
+
+		if (streamName != null) {
+			hash = 37 * hash + streamName.hashCode();
+		}
+		if (shardId != null) {
+			hash = 37 * hash + shardId.hashCode();
+		}
+		if (parentShardId != null) {
+			hash = 37 * hash + parentShardId.hashCode();
+		}
+		if (adjacentParentShardId != null) {
+			hash = 37 * hash + adjacentParentShardId.hashCode();
+		}
+		if (startingHashKey != null) {
+			hash = 37 * hash + startingHashKey.hashCode();
+		}
+		if (endingHashKey != null) {
+			hash = 37 * hash + endingHashKey.hashCode();
+		}
+		if (startingSequenceNumber != null) {
+			hash = 37 * hash + startingSequenceNumber.hashCode();
+		}
+		if (endingSequenceNumber != null) {
+			hash = 37 * hash + endingSequenceNumber.hashCode();
+		}
+
+		return hash;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index d2af6ad..e24a411 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
@@ -102,8 +102,8 @@ public class FlinkKinesisConsumerMigrationTest {
 		testHarness.open();
 
 		// the expected state in "kafka-consumer-migration-test-flink1.1-snapshot"
-		final HashMap<KinesisStreamShardV2, SequenceNumber> expectedState = new HashMap<>();
-		expectedState.put(FlinkKinesisConsumer.createKinesisStreamShardV2(new KinesisStreamShard("fakeStream1",
+		final HashMap<StreamShardMetadata, SequenceNumber> expectedState = new HashMap<>();
+		expectedState.put(KinesisStreamShard.convertToStreamShardMetadata(new KinesisStreamShard("fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("987654321"));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 760858a..186dfa6 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -41,7 +41,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
@@ -543,26 +543,26 @@ public class FlinkKinesisConsumerTest {
 		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
 		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
-		List<Tuple2<KinesisStreamShardV2, SequenceNumber>> globalUnionState = new ArrayList<>(4);
+		List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
 		globalUnionState.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("1")));
 		globalUnionState.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
 			new SequenceNumber("1")));
 		globalUnionState.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
 			new SequenceNumber("1")));
 		globalUnionState.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
 			new SequenceNumber("1")));
 
-		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
-		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : globalUnionState) {
+		TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
+		for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
 			listState.add(state);
 		}
 
@@ -606,23 +606,23 @@ public class FlinkKinesisConsumerTest {
 		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
 		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
-		ArrayList<Tuple2<KinesisStreamShardV2, SequenceNumber>> initialState = new ArrayList<>(1);
+		ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = new ArrayList<>(1);
 		initialState.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("1")));
 
-		ArrayList<Tuple2<KinesisStreamShardV2, SequenceNumber>> expectedStateSnapshot = new ArrayList<>(3);
+		ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> expectedStateSnapshot = new ArrayList<>(3);
 		expectedStateSnapshot.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("12")));
 		expectedStateSnapshot.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
 			new SequenceNumber("11")));
 		expectedStateSnapshot.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
 			new SequenceNumber("31")));
 
@@ -630,8 +630,8 @@ public class FlinkKinesisConsumerTest {
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
-		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : initialState) {
+		TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
+		for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) {
 			listState.add(state);
 		}
 
@@ -646,8 +646,8 @@ public class FlinkKinesisConsumerTest {
 		// mock a running fetcher and its state for snapshot
 		// ----------------------------------------------------------------------
 
-		HashMap<KinesisStreamShardV2, SequenceNumber> stateSnapshot = new HashMap<>();
-		for (Tuple2<KinesisStreamShardV2, SequenceNumber> tuple : expectedStateSnapshot) {
+		HashMap<StreamShardMetadata, SequenceNumber> stateSnapshot = new HashMap<>();
+		for (Tuple2<StreamShardMetadata, SequenceNumber> tuple : expectedStateSnapshot) {
 			stateSnapshot.put(tuple.f0, tuple.f1);
 		}
 
@@ -674,15 +674,15 @@ public class FlinkKinesisConsumerTest {
 		assertEquals(true, listState.clearCalled);
 		assertEquals(3, listState.getList().size());
 
-		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : initialState) {
-			for (Tuple2<KinesisStreamShardV2, SequenceNumber> currentState : listState.getList()) {
+		for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) {
+			for (Tuple2<StreamShardMetadata, SequenceNumber> currentState : listState.getList()) {
 				assertNotEquals(state, currentState);
 			}
 		}
 
-		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : expectedStateSnapshot) {
+		for (Tuple2<StreamShardMetadata, SequenceNumber> state : expectedStateSnapshot) {
 			boolean hasOneIsSame = false;
-			for (Tuple2<KinesisStreamShardV2, SequenceNumber> currentState : listState.getList()) {
+			for (Tuple2<StreamShardMetadata, SequenceNumber> currentState : listState.getList()) {
 				hasOneIsSame = hasOneIsSame || state.equals(currentState);
 			}
 			assertEquals(true, hasOneIsSame);
@@ -736,7 +736,7 @@ public class FlinkKinesisConsumerTest {
 
 		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
 					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
@@ -755,9 +755,9 @@ public class FlinkKinesisConsumerTest {
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
 		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
-			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
+			listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
 		}
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -793,7 +793,7 @@ public class FlinkKinesisConsumerTest {
 
 		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
 					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
@@ -814,12 +814,12 @@ public class FlinkKinesisConsumerTest {
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
 		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
-			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
+			listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
 		}
 		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredStateForOthers.entrySet()) {
-			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
+			listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
 		}
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -856,13 +856,13 @@ public class FlinkKinesisConsumerTest {
 		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet()) {
 			// should never get restored state not belonging to itself
 			Mockito.verify(mockedFetcher, never()).registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
 					restoredShard.getKey(), restoredShard.getValue()));
 		}
 		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			// should get restored state belonging to itself
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
 					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
@@ -910,9 +910,9 @@ public class FlinkKinesisConsumerTest {
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
 		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
-			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
+			listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
 		}
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -953,13 +953,13 @@ public class FlinkKinesisConsumerTest {
 			SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
 		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
 					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
 
 	@Test
-	public void testCreateFunctionToConvertBetweenKinesisStreamShardAndKinesisStreamShardV2() {
+	public void testLegacyKinesisStreamShardToStreamShardMetadataConversion() {
 		String streamName = "fakeStream1";
 		String shardId = "shard-000001";
 		String parentShardId = "shard-000002";
@@ -969,15 +969,15 @@ public class FlinkKinesisConsumerTest {
 		String startingSequenceNumber = "seq-0000021";
 		String endingSequenceNumber = "seq-00000031";
 
-		KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
-		kinesisStreamShardV2.setStreamName(streamName);
-		kinesisStreamShardV2.setShardId(shardId);
-		kinesisStreamShardV2.setParentShardId(parentShardId);
-		kinesisStreamShardV2.setAdjacentParentShardId(adjacentParentShardId);
-		kinesisStreamShardV2.setStartingHashKey(startingHashKey);
-		kinesisStreamShardV2.setEndingHashKey(endingHashKey);
-		kinesisStreamShardV2.setStartingSequenceNumber(startingSequenceNumber);
-		kinesisStreamShardV2.setEndingSequenceNumber(endingSequenceNumber);
+		StreamShardMetadata streamShardMetadata = new StreamShardMetadata();
+		streamShardMetadata.setStreamName(streamName);
+		streamShardMetadata.setShardId(shardId);
+		streamShardMetadata.setParentShardId(parentShardId);
+		streamShardMetadata.setAdjacentParentShardId(adjacentParentShardId);
+		streamShardMetadata.setStartingHashKey(startingHashKey);
+		streamShardMetadata.setEndingHashKey(endingHashKey);
+		streamShardMetadata.setStartingSequenceNumber(startingSequenceNumber);
+		streamShardMetadata.setEndingSequenceNumber(endingSequenceNumber);
 
 		Shard shard = new Shard()
 			.withShardId(shardId)
@@ -991,12 +991,12 @@ public class FlinkKinesisConsumerTest {
 				.withEndingSequenceNumber(endingSequenceNumber));
 		KinesisStreamShard kinesisStreamShard = new KinesisStreamShard(streamName, shard);
 
-		assertEquals(kinesisStreamShardV2, FlinkKinesisConsumer.createKinesisStreamShardV2(kinesisStreamShard));
+		assertEquals(streamShardMetadata, KinesisStreamShard.convertToStreamShardMetadata(kinesisStreamShard));
 	}
 
 	@Test
-	public void testKinesisStreamShardV2WillUsePojoSerializer() {
-		TypeInformation<KinesisStreamShardV2> typeInformation = TypeInformation.of(KinesisStreamShardV2.class);
+	public void testStreamShardMetadataSerializedUsingPojoSerializer() {
+		TypeInformation<StreamShardMetadata> typeInformation = TypeInformation.of(StreamShardMetadata.class);
 		assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 7c36945..4fb6dd4 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
@@ -205,7 +205,7 @@ public class KinesisDataFetcherTest {
 		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
 					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
@@ -296,7 +296,7 @@ public class KinesisDataFetcherTest {
 		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
 					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
@@ -391,7 +391,7 @@ public class KinesisDataFetcherTest {
 		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
 					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
@@ -487,7 +487,7 @@ public class KinesisDataFetcherTest {
 		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
 					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
@@ -521,7 +521,7 @@ public class KinesisDataFetcherTest {
 	}
 
 	@Test
-	public void testCreateFunctionToConvertBetweenKinesisStreamShardV2AndStreamShardHandle() {
+	public void testStreamShardMetadataAndHandleConversion() {
 		String streamName = "fakeStream1";
 		String shardId = "shard-000001";
 		String parentShardId = "shard-000002";
@@ -531,7 +531,7 @@ public class KinesisDataFetcherTest {
 		String startingSequenceNumber = "seq-0000021";
 		String endingSequenceNumber = "seq-00000031";
 
-		KinesisStreamShardV2 kinesisStreamShard = new KinesisStreamShardV2();
+		StreamShardMetadata kinesisStreamShard = new StreamShardMetadata();
 		kinesisStreamShard.setStreamName(streamName);
 		kinesisStreamShard.setShardId(shardId);
 		kinesisStreamShard.setParentShardId(parentShardId);
@@ -553,8 +553,8 @@ public class KinesisDataFetcherTest {
 				.withEndingSequenceNumber(endingSequenceNumber));
 		StreamShardHandle streamShardHandle = new StreamShardHandle(streamName, shard);
 
-		assertEquals(kinesisStreamShard, KinesisDataFetcher.createKinesisStreamShardV2(streamShardHandle));
-		assertEquals(streamShardHandle, KinesisDataFetcher.createStreamShardHandle(kinesisStreamShard));
+		assertEquals(kinesisStreamShard, KinesisDataFetcher.convertToStreamShardMetadata(streamShardHandle));
+		assertEquals(streamShardHandle, KinesisDataFetcher.convertToStreamShardHandle(kinesisStreamShard));
 	}
 
 	private static class DummyFlinkKafkaConsumer<T> extends FlinkKinesisConsumer<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/2597e7e1/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 4e06329..b22ba0c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -54,7 +54,7 @@ public class ShardConsumerTest {
 
 		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
 		subscribedShardsStateUnderTest.add(
-			new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(fakeToBeConsumedShard),
+			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
 				fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
 
 		TestableKinesisDataFetcher fetcher =
@@ -93,7 +93,7 @@ public class ShardConsumerTest {
 
 		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
 		subscribedShardsStateUnderTest.add(
-			new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(fakeToBeConsumedShard),
+			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
 				fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
 
 		TestableKinesisDataFetcher fetcher =