You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/26 08:27:54 UTC

[2/8] flink git commit: [FLINK-6653] Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

[FLINK-6653] Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/913be2f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/913be2f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/913be2f5

Branch: refs/heads/master
Commit: 913be2f5ca3e1f87104273f25c7f4e722b23e562
Parents: 15856af
Author: Tony Wei <to...@gmail.com>
Authored: Thu May 25 10:39:22 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:27:18 2017 +0800

----------------------------------------------------------------------
 .../kinesis/FlinkKinesisConsumer.java           |  71 +++++--
 .../kinesis/internals/KinesisDataFetcher.java   |  92 +++++++--
 .../kinesis/internals/ShardConsumer.java        |   8 +-
 .../kinesis/model/KinesisStreamShard.java       |   5 +-
 .../kinesis/model/KinesisStreamShardState.java  |  21 +-
 .../kinesis/model/KinesisStreamShardV2.java     | 171 ++++++++++++++++
 .../kinesis/model/StreamShardHandle.java        | 129 ++++++++++++
 .../kinesis/proxy/GetShardListResult.java       |  16 +-
 .../connectors/kinesis/proxy/KinesisProxy.java  |  12 +-
 .../kinesis/proxy/KinesisProxyInterface.java    |   4 +-
 .../FlinkKinesisConsumerMigrationTest.java      |   7 +-
 .../kinesis/FlinkKinesisConsumerTest.java       | 205 ++++++++++++-------
 .../internals/KinesisDataFetcherTest.java       | 111 +++++++---
 .../kinesis/internals/ShardConsumerTest.java    |  16 +-
 .../testutils/FakeKinesisBehavioursFactory.java |  22 +-
 15 files changed, 698 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/913be2f5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 4982f7f..b7f5506 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -38,6 +38,8 @@ import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
@@ -98,7 +100,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	private transient KinesisDataFetcher<T> fetcher;
 
 	/** The sequence numbers to restore to upon restore from failure */
-	private transient HashMap<KinesisStreamShard, SequenceNumber> sequenceNumsToRestore;
+	private transient HashMap<KinesisStreamShardV2, SequenceNumber> sequenceNumsToRestore;
 
 	private volatile boolean running = true;
 
@@ -109,7 +111,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	/** State name to access shard sequence number states; cannot be changed */
 	private static final String sequenceNumsStateStoreName = "Kinesis-Stream-Shard-State";
 
-	private transient ListState<Tuple2<KinesisStreamShard, SequenceNumber>> sequenceNumsStateForCheckpoint;
+	private transient ListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> sequenceNumsStateForCheckpoint;
 
 	// ------------------------------------------------------------------------
 	//  Constructors
@@ -197,25 +199,26 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		KinesisDataFetcher<T> fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);
 
 		// initial discovery
-		List<KinesisStreamShard> allShards = fetcher.discoverNewShardsToSubscribe();
+		List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();
 
-		for (KinesisStreamShard shard : allShards) {
+		for (StreamShardHandle shard : allShards) {
+			KinesisStreamShardV2 kinesisStreamShard = KinesisDataFetcher.createKinesisStreamShardV2(shard);
 			if (sequenceNumsToRestore != null) {
-				if (sequenceNumsToRestore.containsKey(shard)) {
+				if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
 					// if the shard was already seen and is contained in the state,
 					// just use the sequence number stored in the state
 					fetcher.registerNewSubscribedShardState(
-						new KinesisStreamShardState(shard, sequenceNumsToRestore.get(shard)));
+						new KinesisStreamShardState(kinesisStreamShard, shard, sequenceNumsToRestore.get(kinesisStreamShard)));
 
 					if (LOG.isInfoEnabled()) {
 						LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
 								" starting state set to the restored sequence number {}",
-							getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(shard));
+							getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(kinesisStreamShard));
 					}
 				} else {
 					// the shard wasn't discovered in the previous run, therefore should be consumed from the beginning
 					fetcher.registerNewSubscribedShardState(
-						new KinesisStreamShardState(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
+						new KinesisStreamShardState(kinesisStreamShard, shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
 
 					if (LOG.isInfoEnabled()) {
 						LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}," +
@@ -231,7 +234,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 						ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();
 
 				fetcher.registerNewSubscribedShardState(
-					new KinesisStreamShardState(shard, startingSeqNum.get()));
+					new KinesisStreamShardState(kinesisStreamShard, shard, startingSeqNum.get()));
 
 				if (LOG.isInfoEnabled()) {
 					LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
@@ -295,8 +298,8 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 
 	@Override
 	public void initializeState(FunctionInitializationContext context) throws Exception {
-		TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
-			TypeInformation.of(KinesisStreamShard.class),
+		TypeInformation<Tuple2<KinesisStreamShardV2, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
+			TypeInformation.of(KinesisStreamShardV2.class),
 			TypeInformation.of(SequenceNumber.class));
 
 		sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
@@ -305,7 +308,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		if (context.isRestored()) {
 			if (sequenceNumsToRestore == null) {
 				sequenceNumsToRestore = new HashMap<>();
-				for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
+				for (Tuple2<KinesisStreamShardV2, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
 					sequenceNumsToRestore.put(kinesisSequenceNumber.f0, kinesisSequenceNumber.f1);
 				}
 
@@ -330,12 +333,12 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 
 			if (fetcher == null) {
 				if (sequenceNumsToRestore != null) {
-					for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
+					for (Map.Entry<KinesisStreamShardV2, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
 						// sequenceNumsToRestore is the restored global union state;
 						// should only snapshot shards that actually belong to us
 
 						if (KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(
-								entry.getKey(),
+								KinesisDataFetcher.createStreamShardHandle(entry.getKey()),
 								getRuntimeContext().getNumberOfParallelSubtasks(),
 								getRuntimeContext().getIndexOfThisSubtask())) {
 
@@ -344,14 +347,14 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 					}
 				}
 			} else {
-				HashMap<KinesisStreamShard, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();
+				HashMap<KinesisStreamShardV2, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();
 
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
 						lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
 				}
 
-				for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
+				for (Map.Entry<KinesisStreamShardV2, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
 					sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
 				}
 			}
@@ -363,7 +366,14 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		LOG.info("Subtask {} restoring offsets from an older Flink version: {}",
 			getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore);
 
-		sequenceNumsToRestore = restoredState.isEmpty() ? null : restoredState;
+		if (restoredState.isEmpty()) {
+			sequenceNumsToRestore = null;
+		} else {
+			sequenceNumsToRestore = new HashMap<>();
+			for (Map.Entry<KinesisStreamShard, SequenceNumber> kv: restoredState.entrySet()) {
+				sequenceNumsToRestore.put(createKinesisStreamShardV2(kv.getKey()), kv.getValue());
+			}
+		}
 	}
 
 	/** This method is exposed for tests that need to mock the KinesisDataFetcher in the consumer. */
@@ -378,7 +388,32 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	}
 
 	@VisibleForTesting
-	HashMap<KinesisStreamShard, SequenceNumber> getRestoredState() {
+	HashMap<KinesisStreamShardV2, SequenceNumber> getRestoredState() {
 		return sequenceNumsToRestore;
 	}
+
+	/**
+	 * Utility function to convert {@link KinesisStreamShard} into {@link KinesisStreamShardV2}
+	 *
+	 * @param kinesisStreamShard the {@link KinesisStreamShard} to be converted
+	 * @return a {@link KinesisStreamShardV2} object
+	 */
+	public static KinesisStreamShardV2 createKinesisStreamShardV2(KinesisStreamShard kinesisStreamShard) {
+		KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
+
+		kinesisStreamShardV2.setStreamName(kinesisStreamShard.getStreamName());
+		kinesisStreamShardV2.setShardId(kinesisStreamShard.getShard().getShardId());
+		kinesisStreamShardV2.setParentShardId(kinesisStreamShard.getShard().getParentShardId());
+		kinesisStreamShardV2.setAdjacentParentShardId(kinesisStreamShard.getShard().getAdjacentParentShardId());
+		if (kinesisStreamShard.getShard().getHashKeyRange() != null) {
+			kinesisStreamShardV2.setStartingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getStartingHashKey());
+			kinesisStreamShardV2.setEndingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getEndingHashKey());
+		}
+		if (kinesisStreamShard.getShard().getSequenceNumberRange() != null) {
+			kinesisStreamShardV2.setStartingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getStartingSequenceNumber());
+			kinesisStreamShardV2.setEndingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getEndingSequenceNumber());
+		}
+
+		return kinesisStreamShardV2;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/913be2f5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 99305cb..b0dceec 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -17,13 +17,17 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -259,7 +263,7 @@ public class KinesisDataFetcher<T> {
 
 				if (LOG.isInfoEnabled()) {
 					LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}",
-						indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(),
+						indexOfThisConsumerSubtask, seededShardState.getStreamShardHandle().toString(),
 						seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
 					}
 
@@ -267,7 +271,7 @@ public class KinesisDataFetcher<T> {
 					new ShardConsumer<>(
 						this,
 						seededStateIndex,
-						subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
+						subscribedShardsState.get(seededStateIndex).getStreamShardHandle(),
 						subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
 			}
 		}
@@ -293,19 +297,19 @@ public class KinesisDataFetcher<T> {
 				LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...",
 					indexOfThisConsumerSubtask);
 			}
-			List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe();
+			List<StreamShardHandle> newShardsDueToResharding = discoverNewShardsToSubscribe();
 
-			for (KinesisStreamShard shard : newShardsDueToResharding) {
+			for (StreamShardHandle shard : newShardsDueToResharding) {
 				// since there may be delay in discovering a new shard, all new shards due to
 				// resharding should be read starting from the earliest record possible
 				KinesisStreamShardState newShardState =
-					new KinesisStreamShardState(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
+					new KinesisStreamShardState(createKinesisStreamShardV2(shard), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
 				int newStateIndex = registerNewSubscribedShardState(newShardState);
 
 				if (LOG.isInfoEnabled()) {
 					LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming " +
 							"the shard from sequence number {} with ShardConsumer {}",
-						indexOfThisConsumerSubtask, newShardState.getKinesisStreamShard().toString(),
+						indexOfThisConsumerSubtask, newShardState.getStreamShardHandle().toString(),
 						newShardState.getLastProcessedSequenceNum(), newStateIndex);
 				}
 
@@ -313,7 +317,7 @@ public class KinesisDataFetcher<T> {
 					new ShardConsumer<>(
 						this,
 						newStateIndex,
-						newShardState.getKinesisStreamShard(),
+						newShardState.getStreamShardHandle(),
 						newShardState.getLastProcessedSequenceNum()));
 			}
 
@@ -349,11 +353,11 @@ public class KinesisDataFetcher<T> {
 	 *
 	 * @return state snapshot
 	 */
-	public HashMap<KinesisStreamShard, SequenceNumber> snapshotState() {
+	public HashMap<KinesisStreamShardV2, SequenceNumber> snapshotState() {
 		// this method assumes that the checkpoint lock is held
 		assert Thread.holdsLock(checkpointLock);
 
-		HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new HashMap<>();
+		HashMap<KinesisStreamShardV2, SequenceNumber> stateSnapshot = new HashMap<>();
 		for (KinesisStreamShardState shardWithState : subscribedShardsState) {
 			stateSnapshot.put(shardWithState.getKinesisStreamShard(), shardWithState.getLastProcessedSequenceNum());
 		}
@@ -405,7 +409,7 @@ public class KinesisDataFetcher<T> {
 		if (lastSeenShardIdOfStream == null) {
 			// if not previously set, simply put as the last seen shard id
 			this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
-		} else if (KinesisStreamShard.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) {
+		} else if (StreamShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) {
 			this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
 		}
 	}
@@ -419,17 +423,17 @@ public class KinesisDataFetcher<T> {
 	 * 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards
 	 *    that we have already seen before the next time this function is called
 	 */
-	public List<KinesisStreamShard> discoverNewShardsToSubscribe() throws InterruptedException {
+	public List<StreamShardHandle> discoverNewShardsToSubscribe() throws InterruptedException {
 
-		List<KinesisStreamShard> newShardsToSubscribe = new LinkedList<>();
+		List<StreamShardHandle> newShardsToSubscribe = new LinkedList<>();
 
 		GetShardListResult shardListResult = kinesis.getShardList(subscribedStreamsToLastDiscoveredShardIds);
 		if (shardListResult.hasRetrievedShards()) {
 			Set<String> streamsWithNewShards = shardListResult.getStreamsWithRetrievedShards();
 
 			for (String stream : streamsWithNewShards) {
-				List<KinesisStreamShard> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream);
-				for (KinesisStreamShard newShard : newShardsOfStream) {
+				List<StreamShardHandle> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream);
+				for (StreamShardHandle newShard : newShardsOfStream) {
 					if (isThisSubtaskShouldSubscribeTo(newShard, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) {
 						newShardsToSubscribe.add(newShard);
 					}
@@ -502,7 +506,7 @@ public class KinesisDataFetcher<T> {
 			// we've finished reading the shard and should determine it to be non-active
 			if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
 				LOG.info("Subtask {} has reached the end of subscribed shard: {}",
-					indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
+					indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getStreamShardHandle());
 
 				// check if we need to mark the source as idle;
 				// note that on resharding, if registerNewSubscribedShardState was invoked for newly discovered shards
@@ -549,7 +553,7 @@ public class KinesisDataFetcher<T> {
 	 * @param totalNumberOfConsumerSubtasks total number of consumer subtasks
 	 * @param indexOfThisConsumerSubtask index of this consumer subtask
 	 */
-	public static boolean isThisSubtaskShouldSubscribeTo(KinesisStreamShard shard,
+	public static boolean isThisSubtaskShouldSubscribeTo(StreamShardHandle shard,
 														int totalNumberOfConsumerSubtasks,
 														int indexOfThisConsumerSubtask) {
 		return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask;
@@ -582,4 +586,58 @@ public class KinesisDataFetcher<T> {
 		}
 		return initial;
 	}
+
+	/**
+	 * Utility function to convert {@link StreamShardHandle} into {@link KinesisStreamShardV2}
+	 *
+	 * @param streamShardHandle the {@link StreamShardHandle} to be converted
+	 * @return a {@link KinesisStreamShardV2} object
+	 */
+	public static KinesisStreamShardV2 createKinesisStreamShardV2(StreamShardHandle streamShardHandle) {
+		KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
+
+		kinesisStreamShardV2.setStreamName(streamShardHandle.getStreamName());
+		kinesisStreamShardV2.setShardId(streamShardHandle.getShard().getShardId());
+		kinesisStreamShardV2.setParentShardId(streamShardHandle.getShard().getParentShardId());
+		kinesisStreamShardV2.setAdjacentParentShardId(streamShardHandle.getShard().getAdjacentParentShardId());
+		if (streamShardHandle.getShard().getHashKeyRange() != null) {
+			kinesisStreamShardV2.setStartingHashKey(streamShardHandle.getShard().getHashKeyRange().getStartingHashKey());
+			kinesisStreamShardV2.setEndingHashKey(streamShardHandle.getShard().getHashKeyRange().getEndingHashKey());
+		}
+		if (streamShardHandle.getShard().getSequenceNumberRange() != null) {
+			kinesisStreamShardV2.setStartingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getStartingSequenceNumber());
+			kinesisStreamShardV2.setEndingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getEndingSequenceNumber());
+		}
+
+		return kinesisStreamShardV2;
+	}
+
+	/**
+	 * Utility function to convert {@link KinesisStreamShardV2} into {@link StreamShardHandle}
+	 *
+	 * @param kinesisStreamShard the {@link KinesisStreamShardV2} to be converted
+	 * @return a {@link StreamShardHandle} object
+	 */
+	public static StreamShardHandle createStreamShardHandle(KinesisStreamShardV2 kinesisStreamShard) {
+		Shard shard = new Shard();
+		shard.withShardId(kinesisStreamShard.getShardId());
+		shard.withParentShardId(kinesisStreamShard.getParentShardId());
+		shard.withAdjacentParentShardId(kinesisStreamShard.getAdjacentParentShardId());
+
+		if (kinesisStreamShard.getStartingHashKey() != null && kinesisStreamShard.getEndingHashKey() != null) {
+			HashKeyRange hashKeyRange = new HashKeyRange();
+			hashKeyRange.withStartingHashKey(kinesisStreamShard.getStartingHashKey());
+			hashKeyRange.withEndingHashKey(kinesisStreamShard.getEndingHashKey());
+			shard.withHashKeyRange(hashKeyRange);
+		}
+
+		if (kinesisStreamShard.getStartingSequenceNumber() != null && kinesisStreamShard.getEndingSequenceNumber() != null) {
+			SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
+			sequenceNumberRange.withStartingSequenceNumber(kinesisStreamShard.getStartingSequenceNumber());
+			sequenceNumberRange.withEndingSequenceNumber(kinesisStreamShard.getEndingSequenceNumber());
+			shard.withSequenceNumberRange(sequenceNumberRange);
+		}
+
+		return new StreamShardHandle(kinesisStreamShard.getStreamName(), shard);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/913be2f5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index ca85854..a724b49 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -24,7 +24,7 @@ import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -60,7 +60,7 @@ public class ShardConsumer<T> implements Runnable {
 
 	private final KinesisDataFetcher<T> fetcherRef;
 
-	private final KinesisStreamShard subscribedShard;
+	private final StreamShardHandle subscribedShard;
 
 	private final int maxNumberOfRecordsPerFetch;
 	private final long fetchIntervalMillis;
@@ -79,7 +79,7 @@ public class ShardConsumer<T> implements Runnable {
 	 */
 	public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
 						Integer subscribedShardStateIndex,
-						KinesisStreamShard subscribedShard,
+						StreamShardHandle subscribedShard,
 						SequenceNumber lastSequenceNum) {
 		this(fetcherRef,
 			subscribedShardStateIndex,
@@ -91,7 +91,7 @@ public class ShardConsumer<T> implements Runnable {
 	/** This constructor is exposed for testing purposes */
 	protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
 							Integer subscribedShardStateIndex,
-							KinesisStreamShard subscribedShard,
+							StreamShardHandle subscribedShard,
 							SequenceNumber lastSequenceNum,
 							KinesisProxyInterface kinesis) {
 		this.fetcherRef = checkNotNull(fetcherRef);

http://git-wip-us.apache.org/repos/asf/flink/blob/913be2f5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
index 53ed11b..f3dcfe1 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
@@ -24,9 +24,8 @@ import java.io.Serializable;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
- * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to
- * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges.
+ * A legacy serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
+ * provided along with {@link com.amazonaws.services.kinesis.model.Shard}.
  */
 public class KinesisStreamShard implements Serializable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/913be2f5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
index 00181da..e68129d 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
@@ -18,22 +18,28 @@
 package org.apache.flink.streaming.connectors.kinesis.model;
 
 /**
- * A wrapper class that bundles a {@link KinesisStreamShard} with its last processed sequence number.
+ * A wrapper class that bundles a {@link StreamShardHandle} with its last processed sequence number.
  */
 public class KinesisStreamShardState {
 
-	private KinesisStreamShard kinesisStreamShard;
+	private KinesisStreamShardV2 kinesisStreamShard;
+	private StreamShardHandle streamShardHandle;
 	private SequenceNumber lastProcessedSequenceNum;
 
-	public KinesisStreamShardState(KinesisStreamShard kinesisStreamShard, SequenceNumber lastProcessedSequenceNum) {
+	public KinesisStreamShardState(KinesisStreamShardV2 kinesisStreamShard, StreamShardHandle streamShardHandle, SequenceNumber lastProcessedSequenceNum) {
 		this.kinesisStreamShard = kinesisStreamShard;
+		this.streamShardHandle = streamShardHandle;
 		this.lastProcessedSequenceNum = lastProcessedSequenceNum;
 	}
 
-	public KinesisStreamShard getKinesisStreamShard() {
+	public KinesisStreamShardV2 getKinesisStreamShard() {
 		return this.kinesisStreamShard;
 	}
 
+	public StreamShardHandle getStreamShardHandle() {
+		return this.streamShardHandle;
+	}
+
 	public SequenceNumber getLastProcessedSequenceNum() {
 		return this.lastProcessedSequenceNum;
 	}
@@ -46,6 +52,7 @@ public class KinesisStreamShardState {
 	public String toString() {
 		return "KinesisStreamShardState{" +
 			"kinesisStreamShard='" + kinesisStreamShard.toString() + "'" +
+			", streamShardHandle='" + streamShardHandle.toString() + "'" +
 			", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}";
 	}
 
@@ -61,11 +68,13 @@ public class KinesisStreamShardState {
 
 		KinesisStreamShardState other = (KinesisStreamShardState) obj;
 
-		return kinesisStreamShard.equals(other.getKinesisStreamShard()) && lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
+		return kinesisStreamShard.equals(other.getKinesisStreamShard()) &&
+			streamShardHandle.equals(other.getStreamShardHandle()) &&
+			lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
 	}
 
 	@Override
 	public int hashCode() {
-		return 37 * (kinesisStreamShard.hashCode() + lastProcessedSequenceNum.hashCode());
+		return 37 * (kinesisStreamShard.hashCode() + streamShardHandle.hashCode() + lastProcessedSequenceNum.hashCode());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/913be2f5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
new file mode 100644
index 0000000..71cb6fa
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
+ * disintegrating from {@link com.amazonaws.services.kinesis.model.Shard} and its nested classes.
+ */
+public class KinesisStreamShardV2 implements Serializable {
+
+	private static final long serialVersionUID = 5134869582298563604L;
+
+	private String streamName;
+	private String shardId;
+	private String parentShardId;
+	private String adjacentParentShardId;
+	private String startingHashKey;
+	private String endingHashKey;
+	private String startingSequenceNumber;
+	private String endingSequenceNumber;
+
+	public void setStreamName(String streamName) {
+		this.streamName = streamName;
+	}
+
+	public void setShardId(String shardId) {
+		this.shardId = shardId;
+	}
+
+	public void setParentShardId(String parentShardId) {
+		this.parentShardId = parentShardId;
+	}
+
+	public void setAdjacentParentShardId(String adjacentParentShardId) {
+		this.adjacentParentShardId = adjacentParentShardId;
+	}
+
+	public void setStartingHashKey(String startingHashKey) {
+		this.startingHashKey = startingHashKey;
+	}
+
+	public void setEndingHashKey(String endingHashKey) {
+		this.endingHashKey = endingHashKey;
+	}
+
+	public void setStartingSequenceNumber(String startingSequenceNumber) {
+		this.startingSequenceNumber = startingSequenceNumber;
+	}
+
+	public void setEndingSequenceNumber(String endingSequenceNumber) {
+		this.endingSequenceNumber = endingSequenceNumber;
+	}
+
+	public String getStreamName() {
+		return this.streamName;
+	}
+
+	public String getShardId() {
+		return this.shardId;
+	}
+
+	public String getParentShardId() {
+		return this.parentShardId;
+	}
+
+	public String getAdjacentParentShardId() {
+		return this.adjacentParentShardId;
+	}
+
+	public String getStartingHashKey() {
+		return this.startingHashKey;
+	}
+
+	public String getEndingHashKey() {
+		return this.endingHashKey;
+	}
+
+	public String getStartingSequenceNumber() {
+		return this.startingSequenceNumber;
+	}
+
+	public String getEndingSequenceNumber() {
+		return this.endingSequenceNumber;
+	}
+
+	@Override
+	public String toString() {
+		return "KinesisStreamShardV2{" +
+			"streamName='" + streamName + "'" +
+			", shardId='" + shardId + "'" +
+			", parentShardId='" + parentShardId + "'" +
+			", adjacentParentShardId='" + adjacentParentShardId + "'" +
+			", startingHashKey='" + startingHashKey + "'" +
+			", endingHashKey='" + endingHashKey + "'" +
+			", startingSequenceNumber='" + startingSequenceNumber + "'" +
+			", endingSequenceNumber='" + endingSequenceNumber + "'}";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (!(obj instanceof KinesisStreamShardV2)) {
+			return false;
+		}
+
+		if (obj == this) {
+			return true;
+		}
+
+		KinesisStreamShardV2 other = (KinesisStreamShardV2) obj;
+
+		return streamName.equals(other.getStreamName()) &&
+			shardId.equals(other.getShardId()) &&
+			Objects.equals(parentShardId, other.getParentShardId()) &&
+			Objects.equals(adjacentParentShardId, other.getAdjacentParentShardId()) &&
+			Objects.equals(startingHashKey, other.getStartingHashKey()) &&
+			Objects.equals(endingHashKey, other.getEndingHashKey()) &&
+			Objects.equals(startingSequenceNumber, other.getStartingSequenceNumber()) &&
+			Objects.equals(endingSequenceNumber, other.getEndingSequenceNumber());
+	}
+
+	@Override
+	public int hashCode() {
+		int hash = 17;
+
+		if (streamName != null) {
+			hash = 37 * hash + streamName.hashCode();
+		}
+		if (shardId != null) {
+			hash = 37 * hash + shardId.hashCode();
+		}
+		if (parentShardId != null) {
+			hash = 37 * hash + parentShardId.hashCode();
+		}
+		if (adjacentParentShardId != null) {
+			hash = 37 * hash + adjacentParentShardId.hashCode();
+		}
+		if (startingHashKey != null) {
+			hash = 37 * hash + startingHashKey.hashCode();
+		}
+		if (endingHashKey != null) {
+			hash = 37 * hash + endingHashKey.hashCode();
+		}
+		if (startingSequenceNumber != null) {
+			hash = 37 * hash + startingSequenceNumber.hashCode();
+		}
+		if (endingSequenceNumber != null) {
+			hash = 37 * hash + endingSequenceNumber.hashCode();
+		}
+
+		return hash;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/913be2f5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
new file mode 100644
index 0000000..d340a88
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import com.amazonaws.services.kinesis.model.Shard;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A wrapper class around the information provided along with streamName and {@link com.amazonaws.services.kinesis.model.Shard},
+ * with some extra utility methods to determine whether or not a shard is closed and whether or not the shard is
+ * a result of parent shard splits or merges.
+ */
+public class StreamShardHandle {
+
+	private final String streamName;
+	private final Shard shard;
+
+	private final int cachedHash;
+
+	/**
+	 * Create a new StreamShardHandle
+	 *
+	 * @param streamName
+	 *           the name of the Kinesis stream that this shard belongs to
+	 * @param shard
+	 *           the actual AWS Shard instance that will be wrapped within this StreamShardHandle
+	 */
+	public StreamShardHandle(String streamName, Shard shard) {
+		this.streamName = checkNotNull(streamName);
+		this.shard = checkNotNull(shard);
+
+		// since our description of Kinesis Streams shards can be fully defined with the stream name and shard id,
+		// our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation
+		int hash = 17;
+		hash = 37 * hash + streamName.hashCode();
+		hash = 37 * hash + shard.getShardId().hashCode();
+		this.cachedHash = hash;
+	}
+
+	public String getStreamName() {
+		return streamName;
+	}
+
+	public boolean isClosed() {
+		return (shard.getSequenceNumberRange().getEndingSequenceNumber() != null);
+	}
+
+	public Shard getShard() {
+		return shard;
+	}
+
+	@Override
+	public String toString() {
+		return "StreamShardHandle{" +
+			"streamName='" + streamName + "'" +
+			", shard='" + shard.toString() + "'}";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (!(obj instanceof StreamShardHandle)) {
+			return false;
+		}
+
+		if (obj == this) {
+			return true;
+		}
+
+		StreamShardHandle other = (StreamShardHandle) obj;
+
+		return streamName.equals(other.getStreamName()) && shard.equals(other.getShard());
+	}
+
+	@Override
+	public int hashCode() {
+		return cachedHash;
+	}
+
+	/**
+	 * Utility function to compare two shard ids
+	 *
+	 * @param firstShardId first shard id to compare
+	 * @param secondShardId second shard id to compare
+	 * @return a value less than 0 if the first shard id is smaller than the second shard id,
+	 *         or a value larger than 0 the first shard is larger then the second shard id,
+	 *         or 0 if they are equal
+	 */
+	public static int compareShardIds(String firstShardId, String secondShardId) {
+		if (!isValidShardId(firstShardId)) {
+			throw new IllegalArgumentException("The first shard id has invalid format.");
+		}
+
+		if (!isValidShardId(secondShardId)) {
+			throw new IllegalArgumentException("The second shard id has invalid format.");
+		}
+
+		// digit segment of the shard id starts at index 8
+		return Long.compare(Long.parseLong(firstShardId.substring(8)), Long.parseLong(secondShardId.substring(8)));
+	}
+
+	/**
+	 * Checks if a shard id has valid format.
+	 * Kinesis stream shard ids have 12-digit numbers left-padded with 0's,
+	 * prefixed with "shardId-", ex. "shardId-000000000015".
+	 *
+	 * @param shardId the shard id to check
+	 * @return whether the shard id is valid
+	 */
+	public static boolean isValidShardId(String shardId) {
+		if (shardId == null) { return false; }
+		return shardId.matches("^shardId-\\d{12}");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/913be2f5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
index 04b1654..aadb31c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -30,25 +30,25 @@ import java.util.Set;
  */
 public class GetShardListResult {
 
-	private final Map<String, LinkedList<KinesisStreamShard>> streamsToRetrievedShardList = new HashMap<>();
+	private final Map<String, LinkedList<StreamShardHandle>> streamsToRetrievedShardList = new HashMap<>();
 
-	public void addRetrievedShardToStream(String stream, KinesisStreamShard retrievedShard) {
+	public void addRetrievedShardToStream(String stream, StreamShardHandle retrievedShard) {
 		if (!streamsToRetrievedShardList.containsKey(stream)) {
-			streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
+			streamsToRetrievedShardList.put(stream, new LinkedList<StreamShardHandle>());
 		}
 		streamsToRetrievedShardList.get(stream).add(retrievedShard);
 	}
 
-	public void addRetrievedShardsToStream(String stream, List<KinesisStreamShard> retrievedShards) {
+	public void addRetrievedShardsToStream(String stream, List<StreamShardHandle> retrievedShards) {
 		if (retrievedShards.size() != 0) {
 			if (!streamsToRetrievedShardList.containsKey(stream)) {
-				streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
+				streamsToRetrievedShardList.put(stream, new LinkedList<StreamShardHandle>());
 			}
 			streamsToRetrievedShardList.get(stream).addAll(retrievedShards);
 		}
 	}
 
-	public List<KinesisStreamShard> getRetrievedShardListOfStream(String stream) {
+	public List<StreamShardHandle> getRetrievedShardListOfStream(String stream) {
 		if (!streamsToRetrievedShardList.containsKey(stream)) {
 			return null;
 		} else {
@@ -56,7 +56,7 @@ public class GetShardListResult {
 		}
 	}
 
-	public KinesisStreamShard getLastSeenShardOfStream(String stream) {
+	public StreamShardHandle getLastSeenShardOfStream(String stream) {
 		if (!streamsToRetrievedShardList.containsKey(stream)) {
 			return null;
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/913be2f5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 580555f..70c1286 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -32,7 +32,7 @@ import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -237,7 +237,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 	 * {@inheritDoc}
 	 */
 	@Override
-	public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
+	public String getShardIterator(StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
 		GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
 			.withStreamName(shard.getStreamName())
 			.withShardId(shard.getShard().getShardId())
@@ -315,8 +315,8 @@ public class KinesisProxy implements KinesisProxyInterface {
 		}
 	}
 
-	private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
-		List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
+	private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
+		List<StreamShardHandle> shardsOfStream = new ArrayList<>();
 
 		DescribeStreamResult describeStreamResult;
 		do {
@@ -324,7 +324,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 
 			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
 			for (Shard shard : shards) {
-				shardsOfStream.add(new KinesisStreamShard(streamName, shard));
+				shardsOfStream.add(new StreamShardHandle(streamName, shard));
 			}
 
 			if (shards.size() != 0) {
@@ -384,7 +384,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
 			Iterator<Shard> shardItr = shards.iterator();
 			while (shardItr.hasNext()) {
-				if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
+				if (StreamShardHandle.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
 					shardItr.remove();
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/913be2f5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
index 9f6d594..807a163 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 
 import java.util.Map;
 
@@ -43,7 +43,7 @@ public interface KinesisProxyInterface {
 	 *                              operation has exceeded the rate limit; this exception will be thrown
 	 *                              if the backoff is interrupted.
 	 */
-	String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) throws InterruptedException;
+	String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) throws InterruptedException;
 
 	/**
 	 * Get the next batch of data records using a specific shard iterator

http://git-wip-us.apache.org/repos/asf/flink/blob/913be2f5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index ec9a9b5..d2af6ad 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
@@ -101,9 +102,9 @@ public class FlinkKinesisConsumerMigrationTest {
 		testHarness.open();
 
 		// the expected state in "kafka-consumer-migration-test-flink1.1-snapshot"
-		final HashMap<KinesisStreamShard, SequenceNumber> expectedState = new HashMap<>();
-		expectedState.put(new KinesisStreamShard("fakeStream1",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+		final HashMap<KinesisStreamShardV2, SequenceNumber> expectedState = new HashMap<>();
+		expectedState.put(FlinkKinesisConsumer.createKinesisStreamShardV2(new KinesisStreamShard("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("987654321"));
 
 		// assert that state is correctly restored from legacy checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/913be2f5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 4b178c7..760858a 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -17,12 +17,17 @@
 
 package org.apache.flink.streaming.connectors.kinesis;
 
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -36,6 +41,8 @@ import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
@@ -62,6 +69,7 @@ import java.util.UUID;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -109,8 +117,8 @@ public class FlinkKinesisConsumerTest {
 	@Test
 	public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
 		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
-				"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
+		exception.expectMessage("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
+			"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
 
 		Properties testConfig = new Properties();
 		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
@@ -535,28 +543,26 @@ public class FlinkKinesisConsumerTest {
 		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
 		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
-		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
-
-		List<Tuple2<KinesisStreamShard, SequenceNumber>> globalUnionState = new ArrayList<>(4);
+		List<Tuple2<KinesisStreamShardV2, SequenceNumber>> globalUnionState = new ArrayList<>(4);
 		globalUnionState.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("1")));
 		globalUnionState.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
 			new SequenceNumber("1")));
 		globalUnionState.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
 			new SequenceNumber("1")));
 		globalUnionState.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
 			new SequenceNumber("1")));
 
-		TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
-		for (Tuple2<KinesisStreamShard, SequenceNumber> state : globalUnionState) {
+		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : globalUnionState) {
 			listState.add(state);
 		}
 
@@ -566,10 +572,10 @@ public class FlinkKinesisConsumerTest {
 		when(context.getNumberOfParallelSubtasks()).thenReturn(2);
 		consumer.setRuntimeContext(context);
 
+		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
 
 		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
 		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
 		when(initializationContext.isRestored()).thenReturn(true);
 
@@ -600,32 +606,32 @@ public class FlinkKinesisConsumerTest {
 		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
 		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
-		ArrayList<Tuple2<KinesisStreamShard, SequenceNumber>> initialState = new ArrayList<>(1);
+		ArrayList<Tuple2<KinesisStreamShardV2, SequenceNumber>> initialState = new ArrayList<>(1);
 		initialState.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream1",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("1")));
 
-		ArrayList<Tuple2<KinesisStreamShard, SequenceNumber>> expectedStateSnapshot = new ArrayList<>(3);
+		ArrayList<Tuple2<KinesisStreamShardV2, SequenceNumber>> expectedStateSnapshot = new ArrayList<>(3);
 		expectedStateSnapshot.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream1",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("12")));
 		expectedStateSnapshot.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream1",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
 			new SequenceNumber("11")));
 		expectedStateSnapshot.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream1",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
 			new SequenceNumber("31")));
 
 		// ----------------------------------------------------------------------
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
-		for (Tuple2<KinesisStreamShard, SequenceNumber> state: initialState) {
+		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : initialState) {
 			listState.add(state);
 		}
 
@@ -640,8 +646,8 @@ public class FlinkKinesisConsumerTest {
 		// mock a running fetcher and its state for snapshot
 		// ----------------------------------------------------------------------
 
-		HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new HashMap<>();
-		for (Tuple2<KinesisStreamShard, SequenceNumber> tuple: expectedStateSnapshot) {
+		HashMap<KinesisStreamShardV2, SequenceNumber> stateSnapshot = new HashMap<>();
+		for (Tuple2<KinesisStreamShardV2, SequenceNumber> tuple : expectedStateSnapshot) {
 			stateSnapshot.put(tuple.f0, tuple.f1);
 		}
 
@@ -668,15 +674,15 @@ public class FlinkKinesisConsumerTest {
 		assertEquals(true, listState.clearCalled);
 		assertEquals(3, listState.getList().size());
 
-		for (Tuple2<KinesisStreamShard, SequenceNumber> state: initialState) {
-			for (Tuple2<KinesisStreamShard, SequenceNumber> currentState: listState.getList()) {
+		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : initialState) {
+			for (Tuple2<KinesisStreamShardV2, SequenceNumber> currentState : listState.getList()) {
 				assertNotEquals(state, currentState);
 			}
 		}
 
-		for (Tuple2<KinesisStreamShard, SequenceNumber> state: expectedStateSnapshot) {
+		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : expectedStateSnapshot) {
 			boolean hasOneIsSame = false;
-			for (Tuple2<KinesisStreamShard, SequenceNumber> currentState: listState.getList()) {
+			for (Tuple2<KinesisStreamShardV2, SequenceNumber> currentState : listState.getList()) {
 				hasOneIsSame = hasOneIsSame || state.equals(currentState);
 			}
 			assertEquals(true, hasOneIsSame);
@@ -706,10 +712,14 @@ public class FlinkKinesisConsumerTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testFetcherShouldBeCorrectlySeededIfRestoringFromLegacyCheckpoint() throws Exception {
-		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
+		HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
+		HashMap<KinesisStreamShard, SequenceNumber> legacyFakeRestoredState = new HashMap<>();
+		for (Map.Entry<StreamShardHandle, SequenceNumber> kv : fakeRestoredState.entrySet()) {
+			legacyFakeRestoredState.put(new KinesisStreamShard(kv.getKey().getStreamName(), kv.getKey().getShard()), kv.getValue());
+		}
 
 		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
-		List<KinesisStreamShard> shards = new ArrayList<>();
+		List<StreamShardHandle> shards = new ArrayList<>();
 		shards.addAll(fakeRestoredState.keySet());
 		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
 		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
@@ -720,13 +730,14 @@ public class FlinkKinesisConsumerTest {
 
 		TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
 			"fakeStream", new Properties(), 10, 2);
-		consumer.restoreState(fakeRestoredState);
+		consumer.restoreState(legacyFakeRestoredState);
 		consumer.open(new Configuration());
 		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
 
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
 
@@ -738,15 +749,15 @@ public class FlinkKinesisConsumerTest {
 		// setup initial state
 		// ----------------------------------------------------------------------
 
-		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
+		HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
 
 		// ----------------------------------------------------------------------
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) {
-			listState.add(Tuple2.of(state.getKey(), state.getValue()));
+		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
+			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
 		}
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -761,7 +772,7 @@ public class FlinkKinesisConsumerTest {
 		// ----------------------------------------------------------------------
 
 		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
-		List<KinesisStreamShard> shards = new ArrayList<>();
+		List<StreamShardHandle> shards = new ArrayList<>();
 		shards.addAll(fakeRestoredState.keySet());
 		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
 		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
@@ -780,9 +791,10 @@ public class FlinkKinesisConsumerTest {
 		consumer.open(new Configuration());
 		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
 
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
 
@@ -794,20 +806,20 @@ public class FlinkKinesisConsumerTest {
 		// setup initial state
 		// ----------------------------------------------------------------------
 
-		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("fakeStream1");
+		HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("fakeStream1");
 
-		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2");
+		HashMap<StreamShardHandle, SequenceNumber> fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2");
 
 		// ----------------------------------------------------------------------
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) {
-			listState.add(Tuple2.of(state.getKey(), state.getValue()));
+		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
+			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
 		}
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredStateForOthers.entrySet()) {
-			listState.add(Tuple2.of(state.getKey(), state.getValue()));
+		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredStateForOthers.entrySet()) {
+			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
 		}
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -822,7 +834,7 @@ public class FlinkKinesisConsumerTest {
 		// ----------------------------------------------------------------------
 
 		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
-		List<KinesisStreamShard> shards = new ArrayList<>();
+		List<StreamShardHandle> shards = new ArrayList<>();
 		shards.addAll(fakeRestoredState.keySet());
 		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
 		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
@@ -841,15 +853,17 @@ public class FlinkKinesisConsumerTest {
 		consumer.open(new Configuration());
 		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
 
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet()) {
+		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet()) {
 			// should never get restored state not belonging to itself
 			Mockito.verify(mockedFetcher, never()).registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+					restoredShard.getKey(), restoredShard.getValue()));
 		}
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			// should get restored state belonging to itself
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
 
@@ -890,15 +904,15 @@ public class FlinkKinesisConsumerTest {
 		// setup initial state
 		// ----------------------------------------------------------------------
 
-		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
+		HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
 
 		// ----------------------------------------------------------------------
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) {
-			listState.add(Tuple2.of(state.getKey(), state.getValue()));
+		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
+			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
 		}
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -913,9 +927,9 @@ public class FlinkKinesisConsumerTest {
 		// ----------------------------------------------------------------------
 
 		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
-		List<KinesisStreamShard> shards = new ArrayList<>();
+		List<StreamShardHandle> shards = new ArrayList<>();
 		shards.addAll(fakeRestoredState.keySet());
-		shards.add(new KinesisStreamShard("fakeStream2",
+		shards.add(new StreamShardHandle("fakeStream2",
 			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
 		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
 		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
@@ -934,15 +948,58 @@ public class FlinkKinesisConsumerTest {
 		consumer.open(new Configuration());
 		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
 
-		fakeRestoredState.put(new KinesisStreamShard("fakeStream2",
+		fakeRestoredState.put(new StreamShardHandle("fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
 			SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
 
+	@Test
+	public void testCreateFunctionToConvertBetweenKinesisStreamShardAndKinesisStreamShardV2() {
+		String streamName = "fakeStream1";
+		String shardId = "shard-000001";
+		String parentShardId = "shard-000002";
+		String adjacentParentShardId = "shard-000003";
+		String startingHashKey = "key-000001";
+		String endingHashKey = "key-000010";
+		String startingSequenceNumber = "seq-0000021";
+		String endingSequenceNumber = "seq-00000031";
+
+		KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
+		kinesisStreamShardV2.setStreamName(streamName);
+		kinesisStreamShardV2.setShardId(shardId);
+		kinesisStreamShardV2.setParentShardId(parentShardId);
+		kinesisStreamShardV2.setAdjacentParentShardId(adjacentParentShardId);
+		kinesisStreamShardV2.setStartingHashKey(startingHashKey);
+		kinesisStreamShardV2.setEndingHashKey(endingHashKey);
+		kinesisStreamShardV2.setStartingSequenceNumber(startingSequenceNumber);
+		kinesisStreamShardV2.setEndingSequenceNumber(endingSequenceNumber);
+
+		Shard shard = new Shard()
+			.withShardId(shardId)
+			.withParentShardId(parentShardId)
+			.withAdjacentParentShardId(adjacentParentShardId)
+			.withHashKeyRange(new HashKeyRange()
+				.withStartingHashKey(startingHashKey)
+				.withEndingHashKey(endingHashKey))
+			.withSequenceNumberRange(new SequenceNumberRange()
+				.withStartingSequenceNumber(startingSequenceNumber)
+				.withEndingSequenceNumber(endingSequenceNumber));
+		KinesisStreamShard kinesisStreamShard = new KinesisStreamShard(streamName, shard);
+
+		assertEquals(kinesisStreamShardV2, FlinkKinesisConsumer.createKinesisStreamShardV2(kinesisStreamShard));
+	}
+
+	@Test
+	public void testKinesisStreamShardV2WillUsePojoSerializer() {
+		TypeInformation<KinesisStreamShardV2> typeInformation = TypeInformation.of(KinesisStreamShardV2.class);
+		assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
+	}
+
 	private static final class TestingListState<T> implements ListState<T> {
 
 		private final List<T> list = new ArrayList<>();
@@ -973,31 +1030,31 @@ public class FlinkKinesisConsumerTest {
 		}
 	}
 
-	private HashMap<KinesisStreamShard, SequenceNumber> getFakeRestoredStore(String streamName) {
-		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>();
+	private HashMap<StreamShardHandle, SequenceNumber> getFakeRestoredStore(String streamName) {
+		HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = new HashMap<>();
 
 		if (streamName.equals("fakeStream1") || streamName.equals("all")) {
 			fakeRestoredState.put(
-				new KinesisStreamShard("fakeStream1",
+				new StreamShardHandle("fakeStream1",
 					new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 				new SequenceNumber(UUID.randomUUID().toString()));
 			fakeRestoredState.put(
-				new KinesisStreamShard("fakeStream1",
+				new StreamShardHandle("fakeStream1",
 					new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 				new SequenceNumber(UUID.randomUUID().toString()));
 			fakeRestoredState.put(
-				new KinesisStreamShard("fakeStream1",
+				new StreamShardHandle("fakeStream1",
 					new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
 				new SequenceNumber(UUID.randomUUID().toString()));
 		}
 
 		if (streamName.equals("fakeStream2") || streamName.equals("all")) {
 			fakeRestoredState.put(
-				new KinesisStreamShard("fakeStream2",
+				new StreamShardHandle("fakeStream2",
 					new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 				new SequenceNumber(UUID.randomUUID().toString()));
 			fakeRestoredState.put(
-				new KinesisStreamShard("fakeStream2",
+				new StreamShardHandle("fakeStream2",
 					new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 				new SequenceNumber(UUID.randomUUID().toString()));
 		}