You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/26 08:42:00 UTC

[1/8] flink git commit: [FLINK-6714] [runtime] Use user classloader for operator state copying on snapshots

Repository: flink
Updated Branches:
  refs/heads/release-1.3 401e99759 -> 7fe4df336


[FLINK-6714] [runtime] Use user classloader for operator state copying on snapshots

This closes #3987.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0454234
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0454234
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0454234

Branch: refs/heads/release-1.3
Commit: e0454234748f838799f905e9271253dfa808a797
Parents: 401e997
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu May 25 16:54:08 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:31:19 2017 +0800

----------------------------------------------------------------------
 .../state/DefaultOperatorStateBackend.java      | 21 ++++++----
 .../runtime/state/OperatorStateBackendTest.java | 42 ++++++++++++++++++++
 2 files changed, 55 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0454234/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index eec2e93..0f96dac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -185,13 +185,18 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 				new HashMap<>(registeredStates.size());
 
 		// eagerly create deep copies of the list states in the sync phase, so that we can use them in the async writing
-		for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) {
-
-			PartitionableListState<?> listState = entry.getValue();
-			if (null != listState) {
-				listState = listState.deepCopy();
+		ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
+		Thread.currentThread().setContextClassLoader(userClassloader);
+		try {
+			for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) {
+				PartitionableListState<?> listState = entry.getValue();
+				if (null != listState) {
+					listState = listState.deepCopy();
+				}
+				registeredStatesDeepCopies.put(entry.getKey(), listState);
 			}
-			registeredStatesDeepCopies.put(entry.getKey(), listState);
+		} finally {
+			Thread.currentThread().setContextClassLoader(snapshotClassLoader);
 		}
 
 		// implementation of the async IO operation, based on FutureTask
@@ -258,8 +263,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			task.run();
 		}
 
-		LOG.info("DefaultOperatorStateBackend snapshot (" + streamFactory + ", synchronous part) in thread " +
-				Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms.");
+		LOG.info("DefaultOperatorStateBackend snapshot ({}, synchronous part) in thread {} took {} ms.",
+				streamFactory, Thread.currentThread(), (System.currentTimeMillis() - syncStartTime));
 
 		return task;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0454234/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 31b75c7..d44f6c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -35,6 +36,9 @@ import org.apache.flink.util.FutureUtil;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -61,6 +65,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
@@ -208,6 +213,43 @@ public class OperatorStateBackendTest {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testCorrectClassLoaderUsedOnSnapshot() throws Exception {
+
+		AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
+
+		final Environment env = createMockEnvironment();
+		OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(env, "test-op-name");
+
+		// mock serializer which tests that on copy, the correct classloader is used as the context classloader
+		TypeSerializer<Integer> mockSerializer = mock(TypeSerializer.class);
+		when(mockSerializer.copy(Matchers.any(Integer.class))).thenAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				Assert.assertEquals(env.getUserClassLoader(), Thread.currentThread().getContextClassLoader());
+				return null;
+			}
+		});
+		// return actual serializers / config snapshots so that the snapshot proceeds properly
+		when(mockSerializer.duplicate()).thenReturn(IntSerializer.INSTANCE);
+		when(mockSerializer.snapshotConfiguration()).thenReturn(IntSerializer.INSTANCE.snapshotConfiguration());
+
+		// write some state
+		ListStateDescriptor<Integer> stateDescriptor = new ListStateDescriptor<>("test", mockSerializer);
+		ListState<Integer> listState = operatorStateBackend.getListState(stateDescriptor);
+
+		listState.add(42);
+
+		CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
+		RunnableFuture<OperatorStateHandle> runnableFuture =
+			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+		FutureUtil.runIfNotDoneAndGet(runnableFuture);
+
+		// make sure that the method of interest is called
+		verify(mockSerializer).copy(Matchers.any(Integer.class));
+	}
+
 	@Test
 	public void testSnapshotEmpty() throws Exception {
 		final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);


[8/8] flink git commit: [FLINK-6653] [kinesis] Improvements to removal of AWS's Shard class in checkpoints

Posted by tz...@apache.org.
[FLINK-6653] [kinesis] Improvements to removal of AWS's Shard class in checkpoints

This closes #3994.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fe4df33
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fe4df33
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fe4df33

Branch: refs/heads/release-1.3
Commit: 7fe4df3361136755e0ca9c5647b178fdb65053f2
Parents: 64ca1aa
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 26 15:56:54 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:33:50 2017 +0800

----------------------------------------------------------------------
 .../kinesis/FlinkKinesisConsumer.java           |  55 ++----
 .../kinesis/internals/KinesisDataFetcher.java   |  74 ++++----
 .../kinesis/model/KinesisStreamShard.java       |  27 +++
 .../kinesis/model/KinesisStreamShardState.java  |  30 ++--
 .../kinesis/model/KinesisStreamShardV2.java     | 171 ------------------
 .../kinesis/model/StreamShardMetadata.java      | 173 +++++++++++++++++++
 .../FlinkKinesisConsumerMigrationTest.java      |   6 +-
 .../kinesis/FlinkKinesisConsumerTest.java       |  94 +++++-----
 .../internals/KinesisDataFetcherTest.java       |  18 +-
 .../kinesis/internals/ShardConsumerTest.java    |   4 +-
 10 files changed, 333 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7fe4df33/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index b7f5506..ea76ccc 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardSta
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
@@ -100,7 +100,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	private transient KinesisDataFetcher<T> fetcher;
 
 	/** The sequence numbers to restore to upon restore from failure */
-	private transient HashMap<KinesisStreamShardV2, SequenceNumber> sequenceNumsToRestore;
+	private transient HashMap<StreamShardMetadata, SequenceNumber> sequenceNumsToRestore;
 
 	private volatile boolean running = true;
 
@@ -111,7 +111,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	/** State name to access shard sequence number states; cannot be changed */
 	private static final String sequenceNumsStateStoreName = "Kinesis-Stream-Shard-State";
 
-	private transient ListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> sequenceNumsStateForCheckpoint;
+	private transient ListState<Tuple2<StreamShardMetadata, SequenceNumber>> sequenceNumsStateForCheckpoint;
 
 	// ------------------------------------------------------------------------
 	//  Constructors
@@ -202,7 +202,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();
 
 		for (StreamShardHandle shard : allShards) {
-			KinesisStreamShardV2 kinesisStreamShard = KinesisDataFetcher.createKinesisStreamShardV2(shard);
+			StreamShardMetadata kinesisStreamShard = KinesisDataFetcher.convertToStreamShardMetadata(shard);
 			if (sequenceNumsToRestore != null) {
 				if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
 					// if the shard was already seen and is contained in the state,
@@ -298,8 +298,8 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 
 	@Override
 	public void initializeState(FunctionInitializationContext context) throws Exception {
-		TypeInformation<Tuple2<KinesisStreamShardV2, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
-			TypeInformation.of(KinesisStreamShardV2.class),
+		TypeInformation<Tuple2<StreamShardMetadata, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
+			TypeInformation.of(StreamShardMetadata.class),
 			TypeInformation.of(SequenceNumber.class));
 
 		sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
@@ -308,7 +308,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		if (context.isRestored()) {
 			if (sequenceNumsToRestore == null) {
 				sequenceNumsToRestore = new HashMap<>();
-				for (Tuple2<KinesisStreamShardV2, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
+				for (Tuple2<StreamShardMetadata, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
 					sequenceNumsToRestore.put(kinesisSequenceNumber.f0, kinesisSequenceNumber.f1);
 				}
 
@@ -333,12 +333,12 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 
 			if (fetcher == null) {
 				if (sequenceNumsToRestore != null) {
-					for (Map.Entry<KinesisStreamShardV2, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
+					for (Map.Entry<StreamShardMetadata, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
 						// sequenceNumsToRestore is the restored global union state;
 						// should only snapshot shards that actually belong to us
 
 						if (KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(
-								KinesisDataFetcher.createStreamShardHandle(entry.getKey()),
+								KinesisDataFetcher.convertToStreamShardHandle(entry.getKey()),
 								getRuntimeContext().getNumberOfParallelSubtasks(),
 								getRuntimeContext().getIndexOfThisSubtask())) {
 
@@ -347,14 +347,14 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 					}
 				}
 			} else {
-				HashMap<KinesisStreamShardV2, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();
+				HashMap<StreamShardMetadata, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();
 
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
 						lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
 				}
 
-				for (Map.Entry<KinesisStreamShardV2, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
+				for (Map.Entry<StreamShardMetadata, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
 					sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
 				}
 			}
@@ -370,8 +370,10 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 			sequenceNumsToRestore = null;
 		} else {
 			sequenceNumsToRestore = new HashMap<>();
-			for (Map.Entry<KinesisStreamShard, SequenceNumber> kv: restoredState.entrySet()) {
-				sequenceNumsToRestore.put(createKinesisStreamShardV2(kv.getKey()), kv.getValue());
+			for (Map.Entry<KinesisStreamShard, SequenceNumber> stateEntry : restoredState.entrySet()) {
+				sequenceNumsToRestore.put(
+						KinesisStreamShard.convertToStreamShardMetadata(stateEntry.getKey()),
+						stateEntry.getValue());
 			}
 		}
 	}
@@ -388,32 +390,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	}
 
 	@VisibleForTesting
-	HashMap<KinesisStreamShardV2, SequenceNumber> getRestoredState() {
+	HashMap<StreamShardMetadata, SequenceNumber> getRestoredState() {
 		return sequenceNumsToRestore;
 	}
-
-	/**
-	 * Utility function to convert {@link KinesisStreamShard} into {@link KinesisStreamShardV2}
-	 *
-	 * @param kinesisStreamShard the {@link KinesisStreamShard} to be converted
-	 * @return a {@link KinesisStreamShardV2} object
-	 */
-	public static KinesisStreamShardV2 createKinesisStreamShardV2(KinesisStreamShard kinesisStreamShard) {
-		KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
-
-		kinesisStreamShardV2.setStreamName(kinesisStreamShard.getStreamName());
-		kinesisStreamShardV2.setShardId(kinesisStreamShard.getShard().getShardId());
-		kinesisStreamShardV2.setParentShardId(kinesisStreamShard.getShard().getParentShardId());
-		kinesisStreamShardV2.setAdjacentParentShardId(kinesisStreamShard.getShard().getAdjacentParentShardId());
-		if (kinesisStreamShard.getShard().getHashKeyRange() != null) {
-			kinesisStreamShardV2.setStartingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getStartingHashKey());
-			kinesisStreamShardV2.setEndingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getEndingHashKey());
-		}
-		if (kinesisStreamShard.getShard().getSequenceNumberRange() != null) {
-			kinesisStreamShardV2.setStartingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getStartingSequenceNumber());
-			kinesisStreamShardV2.setEndingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getEndingSequenceNumber());
-		}
-
-		return kinesisStreamShardV2;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe4df33/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index b0dceec..11ac6d4 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -303,7 +303,7 @@ public class KinesisDataFetcher<T> {
 				// since there may be delay in discovering a new shard, all new shards due to
 				// resharding should be read starting from the earliest record possible
 				KinesisStreamShardState newShardState =
-					new KinesisStreamShardState(createKinesisStreamShardV2(shard), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
+					new KinesisStreamShardState(convertToStreamShardMetadata(shard), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
 				int newStateIndex = registerNewSubscribedShardState(newShardState);
 
 				if (LOG.isInfoEnabled()) {
@@ -353,13 +353,13 @@ public class KinesisDataFetcher<T> {
 	 *
 	 * @return state snapshot
 	 */
-	public HashMap<KinesisStreamShardV2, SequenceNumber> snapshotState() {
+	public HashMap<StreamShardMetadata, SequenceNumber> snapshotState() {
 		// this method assumes that the checkpoint lock is held
 		assert Thread.holdsLock(checkpointLock);
 
-		HashMap<KinesisStreamShardV2, SequenceNumber> stateSnapshot = new HashMap<>();
+		HashMap<StreamShardMetadata, SequenceNumber> stateSnapshot = new HashMap<>();
 		for (KinesisStreamShardState shardWithState : subscribedShardsState) {
-			stateSnapshot.put(shardWithState.getKinesisStreamShard(), shardWithState.getLastProcessedSequenceNum());
+			stateSnapshot.put(shardWithState.getStreamShardMetadata(), shardWithState.getLastProcessedSequenceNum());
 		}
 		return stateSnapshot;
 	}
@@ -588,56 +588,54 @@ public class KinesisDataFetcher<T> {
 	}
 
 	/**
-	 * Utility function to convert {@link StreamShardHandle} into {@link KinesisStreamShardV2}
+	 * Utility function to convert {@link StreamShardHandle} into {@link StreamShardMetadata}
 	 *
 	 * @param streamShardHandle the {@link StreamShardHandle} to be converted
-	 * @return a {@link KinesisStreamShardV2} object
+	 * @return a {@link StreamShardMetadata} object
 	 */
-	public static KinesisStreamShardV2 createKinesisStreamShardV2(StreamShardHandle streamShardHandle) {
-		KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
+	public static StreamShardMetadata convertToStreamShardMetadata(StreamShardHandle streamShardHandle) {
+		StreamShardMetadata streamShardMetadata = new StreamShardMetadata();
+
+		streamShardMetadata.setStreamName(streamShardHandle.getStreamName());
+		streamShardMetadata.setShardId(streamShardHandle.getShard().getShardId());
+		streamShardMetadata.setParentShardId(streamShardHandle.getShard().getParentShardId());
+		streamShardMetadata.setAdjacentParentShardId(streamShardHandle.getShard().getAdjacentParentShardId());
 
-		kinesisStreamShardV2.setStreamName(streamShardHandle.getStreamName());
-		kinesisStreamShardV2.setShardId(streamShardHandle.getShard().getShardId());
-		kinesisStreamShardV2.setParentShardId(streamShardHandle.getShard().getParentShardId());
-		kinesisStreamShardV2.setAdjacentParentShardId(streamShardHandle.getShard().getAdjacentParentShardId());
 		if (streamShardHandle.getShard().getHashKeyRange() != null) {
-			kinesisStreamShardV2.setStartingHashKey(streamShardHandle.getShard().getHashKeyRange().getStartingHashKey());
-			kinesisStreamShardV2.setEndingHashKey(streamShardHandle.getShard().getHashKeyRange().getEndingHashKey());
+			streamShardMetadata.setStartingHashKey(streamShardHandle.getShard().getHashKeyRange().getStartingHashKey());
+			streamShardMetadata.setEndingHashKey(streamShardHandle.getShard().getHashKeyRange().getEndingHashKey());
 		}
+
 		if (streamShardHandle.getShard().getSequenceNumberRange() != null) {
-			kinesisStreamShardV2.setStartingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getStartingSequenceNumber());
-			kinesisStreamShardV2.setEndingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getEndingSequenceNumber());
+			streamShardMetadata.setStartingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getStartingSequenceNumber());
+			streamShardMetadata.setEndingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getEndingSequenceNumber());
 		}
 
-		return kinesisStreamShardV2;
+		return streamShardMetadata;
 	}
 
 	/**
-	 * Utility function to convert {@link KinesisStreamShardV2} into {@link StreamShardHandle}
+	 * Utility function to convert {@link StreamShardMetadata} into {@link StreamShardHandle}
 	 *
-	 * @param kinesisStreamShard the {@link KinesisStreamShardV2} to be converted
+	 * @param streamShardMetadata the {@link StreamShardMetadata} to be converted
 	 * @return a {@link StreamShardHandle} object
 	 */
-	public static StreamShardHandle createStreamShardHandle(KinesisStreamShardV2 kinesisStreamShard) {
+	public static StreamShardHandle convertToStreamShardHandle(StreamShardMetadata streamShardMetadata) {
 		Shard shard = new Shard();
-		shard.withShardId(kinesisStreamShard.getShardId());
-		shard.withParentShardId(kinesisStreamShard.getParentShardId());
-		shard.withAdjacentParentShardId(kinesisStreamShard.getAdjacentParentShardId());
-
-		if (kinesisStreamShard.getStartingHashKey() != null && kinesisStreamShard.getEndingHashKey() != null) {
-			HashKeyRange hashKeyRange = new HashKeyRange();
-			hashKeyRange.withStartingHashKey(kinesisStreamShard.getStartingHashKey());
-			hashKeyRange.withEndingHashKey(kinesisStreamShard.getEndingHashKey());
-			shard.withHashKeyRange(hashKeyRange);
-		}
+		shard.withShardId(streamShardMetadata.getShardId());
+		shard.withParentShardId(streamShardMetadata.getParentShardId());
+		shard.withAdjacentParentShardId(streamShardMetadata.getAdjacentParentShardId());
 
-		if (kinesisStreamShard.getStartingSequenceNumber() != null && kinesisStreamShard.getEndingSequenceNumber() != null) {
-			SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
-			sequenceNumberRange.withStartingSequenceNumber(kinesisStreamShard.getStartingSequenceNumber());
-			sequenceNumberRange.withEndingSequenceNumber(kinesisStreamShard.getEndingSequenceNumber());
-			shard.withSequenceNumberRange(sequenceNumberRange);
-		}
+		HashKeyRange hashKeyRange = new HashKeyRange();
+		hashKeyRange.withStartingHashKey(streamShardMetadata.getStartingHashKey());
+		hashKeyRange.withEndingHashKey(streamShardMetadata.getEndingHashKey());
+		shard.withHashKeyRange(hashKeyRange);
+
+		SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
+		sequenceNumberRange.withStartingSequenceNumber(streamShardMetadata.getStartingSequenceNumber());
+		sequenceNumberRange.withEndingSequenceNumber(streamShardMetadata.getEndingSequenceNumber());
+		shard.withSequenceNumberRange(sequenceNumberRange);
 
-		return new StreamShardHandle(kinesisStreamShard.getStreamName(), shard);
+		return new StreamShardHandle(streamShardMetadata.getStreamName(), shard);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe4df33/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
index f3dcfe1..592e30d 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
@@ -129,4 +129,31 @@ public class KinesisStreamShard implements Serializable {
 		if (shardId == null) { return false; }
 		return shardId.matches("^shardId-\\d{12}");
 	}
+
+	/**
+	 * Utility function to convert {@link KinesisStreamShard} into the new {@link StreamShardMetadata} model.
+	 *
+	 * @param kinesisStreamShard the {@link KinesisStreamShard} to be converted
+	 * @return the converted {@link StreamShardMetadata}
+	 */
+	public static StreamShardMetadata convertToStreamShardMetadata(KinesisStreamShard kinesisStreamShard) {
+		StreamShardMetadata streamShardMetadata = new StreamShardMetadata();
+
+		streamShardMetadata.setStreamName(kinesisStreamShard.getStreamName());
+		streamShardMetadata.setShardId(kinesisStreamShard.getShard().getShardId());
+		streamShardMetadata.setParentShardId(kinesisStreamShard.getShard().getParentShardId());
+		streamShardMetadata.setAdjacentParentShardId(kinesisStreamShard.getShard().getAdjacentParentShardId());
+
+		if (kinesisStreamShard.getShard().getHashKeyRange() != null) {
+			streamShardMetadata.setStartingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getStartingHashKey());
+			streamShardMetadata.setEndingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getEndingHashKey());
+		}
+		
+		if (kinesisStreamShard.getShard().getSequenceNumberRange() != null) {
+			streamShardMetadata.setStartingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getStartingSequenceNumber());
+			streamShardMetadata.setEndingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getEndingSequenceNumber());
+		}
+
+		return streamShardMetadata;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe4df33/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
index e68129d..4b1cc1c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
@@ -17,23 +17,33 @@
 
 package org.apache.flink.streaming.connectors.kinesis.model;
 
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.util.Preconditions;
+
 /**
  * A wrapper class that bundles a {@link StreamShardHandle} with its last processed sequence number.
  */
 public class KinesisStreamShardState {
 
-	private KinesisStreamShardV2 kinesisStreamShard;
+	/** A handle object that wraps the actual {@link Shard} instance and stream name. */
 	private StreamShardHandle streamShardHandle;
+
+	/** The checkpointed state for each Kinesis stream shard. */
+	private StreamShardMetadata streamShardMetadata;
 	private SequenceNumber lastProcessedSequenceNum;
 
-	public KinesisStreamShardState(KinesisStreamShardV2 kinesisStreamShard, StreamShardHandle streamShardHandle, SequenceNumber lastProcessedSequenceNum) {
-		this.kinesisStreamShard = kinesisStreamShard;
-		this.streamShardHandle = streamShardHandle;
-		this.lastProcessedSequenceNum = lastProcessedSequenceNum;
+	public KinesisStreamShardState(
+			StreamShardMetadata streamShardMetadata,
+			StreamShardHandle streamShardHandle,
+			SequenceNumber lastProcessedSequenceNum) {
+
+		this.streamShardMetadata = Preconditions.checkNotNull(streamShardMetadata);
+		this.streamShardHandle = Preconditions.checkNotNull(streamShardHandle);
+		this.lastProcessedSequenceNum = Preconditions.checkNotNull(lastProcessedSequenceNum);
 	}
 
-	public KinesisStreamShardV2 getKinesisStreamShard() {
-		return this.kinesisStreamShard;
+	public StreamShardMetadata getStreamShardMetadata() {
+		return this.streamShardMetadata;
 	}
 
 	public StreamShardHandle getStreamShardHandle() {
@@ -51,7 +61,7 @@ public class KinesisStreamShardState {
 	@Override
 	public String toString() {
 		return "KinesisStreamShardState{" +
-			"kinesisStreamShard='" + kinesisStreamShard.toString() + "'" +
+			"streamShardMetadata='" + streamShardMetadata.toString() + "'" +
 			", streamShardHandle='" + streamShardHandle.toString() + "'" +
 			", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}";
 	}
@@ -68,13 +78,13 @@ public class KinesisStreamShardState {
 
 		KinesisStreamShardState other = (KinesisStreamShardState) obj;
 
-		return kinesisStreamShard.equals(other.getKinesisStreamShard()) &&
+		return streamShardMetadata.equals(other.getStreamShardMetadata()) &&
 			streamShardHandle.equals(other.getStreamShardHandle()) &&
 			lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
 	}
 
 	@Override
 	public int hashCode() {
-		return 37 * (kinesisStreamShard.hashCode() + streamShardHandle.hashCode() + lastProcessedSequenceNum.hashCode());
+		return 37 * (streamShardMetadata.hashCode() + streamShardHandle.hashCode() + lastProcessedSequenceNum.hashCode());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe4df33/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
deleted file mode 100644
index 71cb6fa..0000000
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
- * disintegrating from {@link com.amazonaws.services.kinesis.model.Shard} and its nested classes.
- */
-public class KinesisStreamShardV2 implements Serializable {
-
-	private static final long serialVersionUID = 5134869582298563604L;
-
-	private String streamName;
-	private String shardId;
-	private String parentShardId;
-	private String adjacentParentShardId;
-	private String startingHashKey;
-	private String endingHashKey;
-	private String startingSequenceNumber;
-	private String endingSequenceNumber;
-
-	public void setStreamName(String streamName) {
-		this.streamName = streamName;
-	}
-
-	public void setShardId(String shardId) {
-		this.shardId = shardId;
-	}
-
-	public void setParentShardId(String parentShardId) {
-		this.parentShardId = parentShardId;
-	}
-
-	public void setAdjacentParentShardId(String adjacentParentShardId) {
-		this.adjacentParentShardId = adjacentParentShardId;
-	}
-
-	public void setStartingHashKey(String startingHashKey) {
-		this.startingHashKey = startingHashKey;
-	}
-
-	public void setEndingHashKey(String endingHashKey) {
-		this.endingHashKey = endingHashKey;
-	}
-
-	public void setStartingSequenceNumber(String startingSequenceNumber) {
-		this.startingSequenceNumber = startingSequenceNumber;
-	}
-
-	public void setEndingSequenceNumber(String endingSequenceNumber) {
-		this.endingSequenceNumber = endingSequenceNumber;
-	}
-
-	public String getStreamName() {
-		return this.streamName;
-	}
-
-	public String getShardId() {
-		return this.shardId;
-	}
-
-	public String getParentShardId() {
-		return this.parentShardId;
-	}
-
-	public String getAdjacentParentShardId() {
-		return this.adjacentParentShardId;
-	}
-
-	public String getStartingHashKey() {
-		return this.startingHashKey;
-	}
-
-	public String getEndingHashKey() {
-		return this.endingHashKey;
-	}
-
-	public String getStartingSequenceNumber() {
-		return this.startingSequenceNumber;
-	}
-
-	public String getEndingSequenceNumber() {
-		return this.endingSequenceNumber;
-	}
-
-	@Override
-	public String toString() {
-		return "KinesisStreamShardV2{" +
-			"streamName='" + streamName + "'" +
-			", shardId='" + shardId + "'" +
-			", parentShardId='" + parentShardId + "'" +
-			", adjacentParentShardId='" + adjacentParentShardId + "'" +
-			", startingHashKey='" + startingHashKey + "'" +
-			", endingHashKey='" + endingHashKey + "'" +
-			", startingSequenceNumber='" + startingSequenceNumber + "'" +
-			", endingSequenceNumber='" + endingSequenceNumber + "'}";
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (!(obj instanceof KinesisStreamShardV2)) {
-			return false;
-		}
-
-		if (obj == this) {
-			return true;
-		}
-
-		KinesisStreamShardV2 other = (KinesisStreamShardV2) obj;
-
-		return streamName.equals(other.getStreamName()) &&
-			shardId.equals(other.getShardId()) &&
-			Objects.equals(parentShardId, other.getParentShardId()) &&
-			Objects.equals(adjacentParentShardId, other.getAdjacentParentShardId()) &&
-			Objects.equals(startingHashKey, other.getStartingHashKey()) &&
-			Objects.equals(endingHashKey, other.getEndingHashKey()) &&
-			Objects.equals(startingSequenceNumber, other.getStartingSequenceNumber()) &&
-			Objects.equals(endingSequenceNumber, other.getEndingSequenceNumber());
-	}
-
-	@Override
-	public int hashCode() {
-		int hash = 17;
-
-		if (streamName != null) {
-			hash = 37 * hash + streamName.hashCode();
-		}
-		if (shardId != null) {
-			hash = 37 * hash + shardId.hashCode();
-		}
-		if (parentShardId != null) {
-			hash = 37 * hash + parentShardId.hashCode();
-		}
-		if (adjacentParentShardId != null) {
-			hash = 37 * hash + adjacentParentShardId.hashCode();
-		}
-		if (startingHashKey != null) {
-			hash = 37 * hash + startingHashKey.hashCode();
-		}
-		if (endingHashKey != null) {
-			hash = 37 * hash + endingHashKey.hashCode();
-		}
-		if (startingSequenceNumber != null) {
-			hash = 37 * hash + startingSequenceNumber.hashCode();
-		}
-		if (endingSequenceNumber != null) {
-			hash = 37 * hash + endingSequenceNumber.hashCode();
-		}
-
-		return hash;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe4df33/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardMetadata.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardMetadata.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardMetadata.java
new file mode 100644
index 0000000..a158a8b
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardMetadata.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
+ * disintegrated from {@link com.amazonaws.services.kinesis.model.Shard} and its nested classes. The disintegration
+ * is required to avoid being locked-in to a specific AWS SDK version in order to maintain the consumer's state
+ * backwards compatibility.
+ */
+public class StreamShardMetadata implements Serializable {
+
+	private static final long serialVersionUID = 5134869582298563604L;
+
+	private String streamName;
+	private String shardId;
+	private String parentShardId;
+	private String adjacentParentShardId;
+	private String startingHashKey;
+	private String endingHashKey;
+	private String startingSequenceNumber;
+	private String endingSequenceNumber;
+
+	public void setStreamName(String streamName) {
+		this.streamName = streamName;
+	}
+
+	public void setShardId(String shardId) {
+		this.shardId = shardId;
+	}
+
+	public void setParentShardId(String parentShardId) {
+		this.parentShardId = parentShardId;
+	}
+
+	public void setAdjacentParentShardId(String adjacentParentShardId) {
+		this.adjacentParentShardId = adjacentParentShardId;
+	}
+
+	public void setStartingHashKey(String startingHashKey) {
+		this.startingHashKey = startingHashKey;
+	}
+
+	public void setEndingHashKey(String endingHashKey) {
+		this.endingHashKey = endingHashKey;
+	}
+
+	public void setStartingSequenceNumber(String startingSequenceNumber) {
+		this.startingSequenceNumber = startingSequenceNumber;
+	}
+
+	public void setEndingSequenceNumber(String endingSequenceNumber) {
+		this.endingSequenceNumber = endingSequenceNumber;
+	}
+
+	public String getStreamName() {
+		return this.streamName;
+	}
+
+	public String getShardId() {
+		return this.shardId;
+	}
+
+	public String getParentShardId() {
+		return this.parentShardId;
+	}
+
+	public String getAdjacentParentShardId() {
+		return this.adjacentParentShardId;
+	}
+
+	public String getStartingHashKey() {
+		return this.startingHashKey;
+	}
+
+	public String getEndingHashKey() {
+		return this.endingHashKey;
+	}
+
+	public String getStartingSequenceNumber() {
+		return this.startingSequenceNumber;
+	}
+
+	public String getEndingSequenceNumber() {
+		return this.endingSequenceNumber;
+	}
+
+	@Override
+	public String toString() {
+		return "StreamShardMetadata{" +
+			"streamName='" + streamName + "'" +
+			", shardId='" + shardId + "'" +
+			", parentShardId='" + parentShardId + "'" +
+			", adjacentParentShardId='" + adjacentParentShardId + "'" +
+			", startingHashKey='" + startingHashKey + "'" +
+			", endingHashKey='" + endingHashKey + "'" +
+			", startingSequenceNumber='" + startingSequenceNumber + "'" +
+			", endingSequenceNumber='" + endingSequenceNumber + "'}";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (!(obj instanceof StreamShardMetadata)) {
+			return false;
+		}
+
+		if (obj == this) {
+			return true;
+		}
+
+		StreamShardMetadata other = (StreamShardMetadata) obj;
+
+		return streamName.equals(other.getStreamName()) &&
+			shardId.equals(other.getShardId()) &&
+			Objects.equals(parentShardId, other.getParentShardId()) &&
+			Objects.equals(adjacentParentShardId, other.getAdjacentParentShardId()) &&
+			Objects.equals(startingHashKey, other.getStartingHashKey()) &&
+			Objects.equals(endingHashKey, other.getEndingHashKey()) &&
+			Objects.equals(startingSequenceNumber, other.getStartingSequenceNumber()) &&
+			Objects.equals(endingSequenceNumber, other.getEndingSequenceNumber());
+	}
+
+	@Override
+	public int hashCode() {
+		int hash = 17;
+
+		if (streamName != null) {
+			hash = 37 * hash + streamName.hashCode();
+		}
+		if (shardId != null) {
+			hash = 37 * hash + shardId.hashCode();
+		}
+		if (parentShardId != null) {
+			hash = 37 * hash + parentShardId.hashCode();
+		}
+		if (adjacentParentShardId != null) {
+			hash = 37 * hash + adjacentParentShardId.hashCode();
+		}
+		if (startingHashKey != null) {
+			hash = 37 * hash + startingHashKey.hashCode();
+		}
+		if (endingHashKey != null) {
+			hash = 37 * hash + endingHashKey.hashCode();
+		}
+		if (startingSequenceNumber != null) {
+			hash = 37 * hash + startingSequenceNumber.hashCode();
+		}
+		if (endingSequenceNumber != null) {
+			hash = 37 * hash + endingSequenceNumber.hashCode();
+		}
+
+		return hash;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe4df33/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index d2af6ad..e24a411 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
@@ -102,8 +102,8 @@ public class FlinkKinesisConsumerMigrationTest {
 		testHarness.open();
 
 		// the expected state in "kafka-consumer-migration-test-flink1.1-snapshot"
-		final HashMap<KinesisStreamShardV2, SequenceNumber> expectedState = new HashMap<>();
-		expectedState.put(FlinkKinesisConsumer.createKinesisStreamShardV2(new KinesisStreamShard("fakeStream1",
+		final HashMap<StreamShardMetadata, SequenceNumber> expectedState = new HashMap<>();
+		expectedState.put(KinesisStreamShard.convertToStreamShardMetadata(new KinesisStreamShard("fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("987654321"));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe4df33/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 760858a..186dfa6 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -41,7 +41,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
@@ -543,26 +543,26 @@ public class FlinkKinesisConsumerTest {
 		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
 		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
-		List<Tuple2<KinesisStreamShardV2, SequenceNumber>> globalUnionState = new ArrayList<>(4);
+		List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
 		globalUnionState.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("1")));
 		globalUnionState.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
 			new SequenceNumber("1")));
 		globalUnionState.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
 			new SequenceNumber("1")));
 		globalUnionState.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
 			new SequenceNumber("1")));
 
-		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
-		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : globalUnionState) {
+		TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
+		for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
 			listState.add(state);
 		}
 
@@ -606,23 +606,23 @@ public class FlinkKinesisConsumerTest {
 		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
 		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
-		ArrayList<Tuple2<KinesisStreamShardV2, SequenceNumber>> initialState = new ArrayList<>(1);
+		ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = new ArrayList<>(1);
 		initialState.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("1")));
 
-		ArrayList<Tuple2<KinesisStreamShardV2, SequenceNumber>> expectedStateSnapshot = new ArrayList<>(3);
+		ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> expectedStateSnapshot = new ArrayList<>(3);
 		expectedStateSnapshot.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("12")));
 		expectedStateSnapshot.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
 			new SequenceNumber("11")));
 		expectedStateSnapshot.add(Tuple2.of(
-			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+			KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
 			new SequenceNumber("31")));
 
@@ -630,8 +630,8 @@ public class FlinkKinesisConsumerTest {
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
-		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : initialState) {
+		TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
+		for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) {
 			listState.add(state);
 		}
 
@@ -646,8 +646,8 @@ public class FlinkKinesisConsumerTest {
 		// mock a running fetcher and its state for snapshot
 		// ----------------------------------------------------------------------
 
-		HashMap<KinesisStreamShardV2, SequenceNumber> stateSnapshot = new HashMap<>();
-		for (Tuple2<KinesisStreamShardV2, SequenceNumber> tuple : expectedStateSnapshot) {
+		HashMap<StreamShardMetadata, SequenceNumber> stateSnapshot = new HashMap<>();
+		for (Tuple2<StreamShardMetadata, SequenceNumber> tuple : expectedStateSnapshot) {
 			stateSnapshot.put(tuple.f0, tuple.f1);
 		}
 
@@ -674,15 +674,15 @@ public class FlinkKinesisConsumerTest {
 		assertEquals(true, listState.clearCalled);
 		assertEquals(3, listState.getList().size());
 
-		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : initialState) {
-			for (Tuple2<KinesisStreamShardV2, SequenceNumber> currentState : listState.getList()) {
+		for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) {
+			for (Tuple2<StreamShardMetadata, SequenceNumber> currentState : listState.getList()) {
 				assertNotEquals(state, currentState);
 			}
 		}
 
-		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : expectedStateSnapshot) {
+		for (Tuple2<StreamShardMetadata, SequenceNumber> state : expectedStateSnapshot) {
 			boolean hasOneIsSame = false;
-			for (Tuple2<KinesisStreamShardV2, SequenceNumber> currentState : listState.getList()) {
+			for (Tuple2<StreamShardMetadata, SequenceNumber> currentState : listState.getList()) {
 				hasOneIsSame = hasOneIsSame || state.equals(currentState);
 			}
 			assertEquals(true, hasOneIsSame);
@@ -736,7 +736,7 @@ public class FlinkKinesisConsumerTest {
 
 		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
 					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
@@ -755,9 +755,9 @@ public class FlinkKinesisConsumerTest {
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
 		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
-			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
+			listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
 		}
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -793,7 +793,7 @@ public class FlinkKinesisConsumerTest {
 
 		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
 					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
@@ -814,12 +814,12 @@ public class FlinkKinesisConsumerTest {
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
 		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
-			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
+			listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
 		}
 		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredStateForOthers.entrySet()) {
-			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
+			listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
 		}
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -856,13 +856,13 @@ public class FlinkKinesisConsumerTest {
 		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet()) {
 			// should never get restored state not belonging to itself
 			Mockito.verify(mockedFetcher, never()).registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
 					restoredShard.getKey(), restoredShard.getValue()));
 		}
 		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			// should get restored state belonging to itself
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
 					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
@@ -910,9 +910,9 @@ public class FlinkKinesisConsumerTest {
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
 		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
-			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
+			listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
 		}
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -953,13 +953,13 @@ public class FlinkKinesisConsumerTest {
 			SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
 		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
 					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
 
 	@Test
-	public void testCreateFunctionToConvertBetweenKinesisStreamShardAndKinesisStreamShardV2() {
+	public void testLegacyKinesisStreamShardToStreamShardMetadataConversion() {
 		String streamName = "fakeStream1";
 		String shardId = "shard-000001";
 		String parentShardId = "shard-000002";
@@ -969,15 +969,15 @@ public class FlinkKinesisConsumerTest {
 		String startingSequenceNumber = "seq-0000021";
 		String endingSequenceNumber = "seq-00000031";
 
-		KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
-		kinesisStreamShardV2.setStreamName(streamName);
-		kinesisStreamShardV2.setShardId(shardId);
-		kinesisStreamShardV2.setParentShardId(parentShardId);
-		kinesisStreamShardV2.setAdjacentParentShardId(adjacentParentShardId);
-		kinesisStreamShardV2.setStartingHashKey(startingHashKey);
-		kinesisStreamShardV2.setEndingHashKey(endingHashKey);
-		kinesisStreamShardV2.setStartingSequenceNumber(startingSequenceNumber);
-		kinesisStreamShardV2.setEndingSequenceNumber(endingSequenceNumber);
+		StreamShardMetadata streamShardMetadata = new StreamShardMetadata();
+		streamShardMetadata.setStreamName(streamName);
+		streamShardMetadata.setShardId(shardId);
+		streamShardMetadata.setParentShardId(parentShardId);
+		streamShardMetadata.setAdjacentParentShardId(adjacentParentShardId);
+		streamShardMetadata.setStartingHashKey(startingHashKey);
+		streamShardMetadata.setEndingHashKey(endingHashKey);
+		streamShardMetadata.setStartingSequenceNumber(startingSequenceNumber);
+		streamShardMetadata.setEndingSequenceNumber(endingSequenceNumber);
 
 		Shard shard = new Shard()
 			.withShardId(shardId)
@@ -991,12 +991,12 @@ public class FlinkKinesisConsumerTest {
 				.withEndingSequenceNumber(endingSequenceNumber));
 		KinesisStreamShard kinesisStreamShard = new KinesisStreamShard(streamName, shard);
 
-		assertEquals(kinesisStreamShardV2, FlinkKinesisConsumer.createKinesisStreamShardV2(kinesisStreamShard));
+		assertEquals(streamShardMetadata, KinesisStreamShard.convertToStreamShardMetadata(kinesisStreamShard));
 	}
 
 	@Test
-	public void testKinesisStreamShardV2WillUsePojoSerializer() {
-		TypeInformation<KinesisStreamShardV2> typeInformation = TypeInformation.of(KinesisStreamShardV2.class);
+	public void testStreamShardMetadataSerializedUsingPojoSerializer() {
+		TypeInformation<StreamShardMetadata> typeInformation = TypeInformation.of(StreamShardMetadata.class);
 		assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe4df33/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 7c36945..4fb6dd4 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
@@ -205,7 +205,7 @@ public class KinesisDataFetcherTest {
 		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
 					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
@@ -296,7 +296,7 @@ public class KinesisDataFetcherTest {
 		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
 					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
@@ -391,7 +391,7 @@ public class KinesisDataFetcherTest {
 		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
 					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
@@ -487,7 +487,7 @@ public class KinesisDataFetcherTest {
 		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+				new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
 					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
@@ -521,7 +521,7 @@ public class KinesisDataFetcherTest {
 	}
 
 	@Test
-	public void testCreateFunctionToConvertBetweenKinesisStreamShardV2AndStreamShardHandle() {
+	public void testStreamShardMetadataAndHandleConversion() {
 		String streamName = "fakeStream1";
 		String shardId = "shard-000001";
 		String parentShardId = "shard-000002";
@@ -531,7 +531,7 @@ public class KinesisDataFetcherTest {
 		String startingSequenceNumber = "seq-0000021";
 		String endingSequenceNumber = "seq-00000031";
 
-		KinesisStreamShardV2 kinesisStreamShard = new KinesisStreamShardV2();
+		StreamShardMetadata kinesisStreamShard = new StreamShardMetadata();
 		kinesisStreamShard.setStreamName(streamName);
 		kinesisStreamShard.setShardId(shardId);
 		kinesisStreamShard.setParentShardId(parentShardId);
@@ -553,8 +553,8 @@ public class KinesisDataFetcherTest {
 				.withEndingSequenceNumber(endingSequenceNumber));
 		StreamShardHandle streamShardHandle = new StreamShardHandle(streamName, shard);
 
-		assertEquals(kinesisStreamShard, KinesisDataFetcher.createKinesisStreamShardV2(streamShardHandle));
-		assertEquals(streamShardHandle, KinesisDataFetcher.createStreamShardHandle(kinesisStreamShard));
+		assertEquals(kinesisStreamShard, KinesisDataFetcher.convertToStreamShardMetadata(streamShardHandle));
+		assertEquals(streamShardHandle, KinesisDataFetcher.convertToStreamShardHandle(kinesisStreamShard));
 	}
 
 	private static class DummyFlinkKafkaConsumer<T> extends FlinkKinesisConsumer<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe4df33/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 4e06329..b22ba0c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -54,7 +54,7 @@ public class ShardConsumerTest {
 
 		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
 		subscribedShardsStateUnderTest.add(
-			new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(fakeToBeConsumedShard),
+			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
 				fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
 
 		TestableKinesisDataFetcher fetcher =
@@ -93,7 +93,7 @@ public class ShardConsumerTest {
 
 		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
 		subscribedShardsStateUnderTest.add(
-			new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(fakeToBeConsumedShard),
+			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
 				fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
 
 		TestableKinesisDataFetcher fetcher =


[6/8] flink git commit: [FLINK-6653] Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

Posted by tz...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 800fde5..7c36945 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -17,14 +17,17 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
@@ -46,6 +49,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -149,33 +153,33 @@ public class KinesisDataFetcherTest {
 		fakeStreams.add("fakeStream1");
 		fakeStreams.add("fakeStream2");
 
-		Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+		Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
 
 		// fakeStream1 has 3 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
 			UUID.randomUUID().toString());
 
 		// fakeStream2 has 2 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
@@ -198,10 +202,11 @@ public class KinesisDataFetcherTest {
 				subscribedStreamsToLastSeenShardIdsUnderTest,
 				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
 
-		for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) {
+		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
 		PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
@@ -238,33 +243,33 @@ public class KinesisDataFetcherTest {
 		fakeStreams.add("fakeStream1");
 		fakeStreams.add("fakeStream2");
 
-		Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+		Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
 
 		// fakeStream1 has 3 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
 			UUID.randomUUID().toString());
 
 		// fakeStream2 has 2 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
@@ -288,10 +293,11 @@ public class KinesisDataFetcherTest {
 				subscribedStreamsToLastSeenShardIdsUnderTest,
 				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
 
-		for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) {
+		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
 		PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
@@ -330,33 +336,33 @@ public class KinesisDataFetcherTest {
 		fakeStreams.add("fakeStream3"); // fakeStream3 will not have any shards
 		fakeStreams.add("fakeStream4"); // fakeStream4 will not have any shards
 
-		Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+		Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
 
 		// fakeStream1 has 3 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
 			UUID.randomUUID().toString());
 
 		// fakeStream2 has 2 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
@@ -382,10 +388,11 @@ public class KinesisDataFetcherTest {
 				subscribedStreamsToLastSeenShardIdsUnderTest,
 				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
 
-		for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) {
+		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
 		PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
@@ -425,33 +432,33 @@ public class KinesisDataFetcherTest {
 		fakeStreams.add("fakeStream3"); // fakeStream3 will not have any shards
 		fakeStreams.add("fakeStream4"); // fakeStream4 will not have any shards
 
-		Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+		Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
 
 		// fakeStream1 has 3 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream1",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
 			UUID.randomUUID().toString());
 
 		// fakeStream2 has 2 shards before restore
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 			UUID.randomUUID().toString());
 		restoredStateUnderTest.put(
-			new KinesisStreamShard(
+			new StreamShardHandle(
 				"fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
@@ -477,10 +484,11 @@ public class KinesisDataFetcherTest {
 				subscribedStreamsToLastSeenShardIdsUnderTest,
 				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
 
-		for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) {
+		for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
 			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
 			fetcher.registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+					restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
 		}
 
 		PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
@@ -512,6 +520,43 @@ public class KinesisDataFetcherTest {
 		assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream4") == null);
 	}
 
+	@Test
+	public void testCreateFunctionToConvertBetweenKinesisStreamShardV2AndStreamShardHandle() {
+		String streamName = "fakeStream1";
+		String shardId = "shard-000001";
+		String parentShardId = "shard-000002";
+		String adjacentParentShardId = "shard-000003";
+		String startingHashKey = "key-000001";
+		String endingHashKey = "key-000010";
+		String startingSequenceNumber = "seq-0000021";
+		String endingSequenceNumber = "seq-00000031";
+
+		KinesisStreamShardV2 kinesisStreamShard = new KinesisStreamShardV2();
+		kinesisStreamShard.setStreamName(streamName);
+		kinesisStreamShard.setShardId(shardId);
+		kinesisStreamShard.setParentShardId(parentShardId);
+		kinesisStreamShard.setAdjacentParentShardId(adjacentParentShardId);
+		kinesisStreamShard.setStartingHashKey(startingHashKey);
+		kinesisStreamShard.setEndingHashKey(endingHashKey);
+		kinesisStreamShard.setStartingSequenceNumber(startingSequenceNumber);
+		kinesisStreamShard.setEndingSequenceNumber(endingSequenceNumber);
+
+		Shard shard = new Shard()
+			.withShardId(shardId)
+			.withParentShardId(parentShardId)
+			.withAdjacentParentShardId(adjacentParentShardId)
+			.withHashKeyRange(new HashKeyRange()
+				.withStartingHashKey(startingHashKey)
+				.withEndingHashKey(endingHashKey))
+			.withSequenceNumberRange(new SequenceNumberRange()
+				.withStartingSequenceNumber(startingSequenceNumber)
+				.withEndingSequenceNumber(endingSequenceNumber));
+		StreamShardHandle streamShardHandle = new StreamShardHandle(streamName, shard);
+
+		assertEquals(kinesisStreamShard, KinesisDataFetcher.createKinesisStreamShardV2(streamShardHandle));
+		assertEquals(streamShardHandle, KinesisDataFetcher.createStreamShardHandle(kinesisStreamShard));
+	}
+
 	private static class DummyFlinkKafkaConsumer<T> extends FlinkKinesisConsumer<T> {
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 96764a4..4e06329 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.internals;
 import com.amazonaws.services.kinesis.model.HashKeyRange;
 import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.commons.lang.StringUtils;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
@@ -43,7 +43,7 @@ public class ShardConsumerTest {
 
 	@Test
 	public void testCorrectNumOfCollectedRecordsAndUpdatedState() {
-		KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard(
+		StreamShardHandle fakeToBeConsumedShard = new StreamShardHandle(
 			"fakeStream",
 			new Shard()
 				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
@@ -54,7 +54,8 @@ public class ShardConsumerTest {
 
 		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
 		subscribedShardsStateUnderTest.add(
-			new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+			new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(fakeToBeConsumedShard),
+				fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
 
 		TestableKinesisDataFetcher fetcher =
 			new TestableKinesisDataFetcher(
@@ -70,7 +71,7 @@ public class ShardConsumerTest {
 		new ShardConsumer<>(
 			fetcher,
 			0,
-			subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(),
+			subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
 			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
 			FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9)).run();
 
@@ -81,7 +82,7 @@ public class ShardConsumerTest {
 
 	@Test
 	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
-		KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard(
+		StreamShardHandle fakeToBeConsumedShard = new StreamShardHandle(
 			"fakeStream",
 			new Shard()
 				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
@@ -92,7 +93,8 @@ public class ShardConsumerTest {
 
 		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
 		subscribedShardsStateUnderTest.add(
-			new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+			new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(fakeToBeConsumedShard),
+				fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
 
 		TestableKinesisDataFetcher fetcher =
 			new TestableKinesisDataFetcher(
@@ -108,7 +110,7 @@ public class ShardConsumerTest {
 		new ShardConsumer<>(
 			fetcher,
 			0,
-			subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(),
+			subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
 			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
 			// Get a total of 1000 records with 9 getRecords() calls,
 			// and the 7th getRecords() call will encounter an unexpected expired shard iterator

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index b62e7de..ce5a0de 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -22,7 +22,7 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 
@@ -55,7 +55,7 @@ public class FakeKinesisBehavioursFactory {
 			}
 
 			@Override
-			public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
+			public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
 				return null;
 			}
 
@@ -122,7 +122,7 @@ public class FakeKinesisBehavioursFactory {
 		}
 
 		@Override
-		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
+		public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
 			if (!expiredOnceAlready) {
 				// for the first call, just return the iterator of the first batch of records
 				return "0";
@@ -181,7 +181,7 @@ public class FakeKinesisBehavioursFactory {
 		}
 
 		@Override
-		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
+		public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
 			// this will be called only one time per ShardConsumer;
 			// so, simply return the iterator of the first batch of records
 			return "0";
@@ -209,7 +209,7 @@ public class FakeKinesisBehavioursFactory {
 
 	private static class NonReshardedStreamsKinesis implements KinesisProxyInterface {
 
-		private Map<String, List<KinesisStreamShard>> streamsWithListOfShards = new HashMap<>();
+		private Map<String, List<StreamShardHandle>> streamsWithListOfShards = new HashMap<>();
 
 		public NonReshardedStreamsKinesis(Map<String,Integer> streamsToShardCount) {
 			for (Map.Entry<String,Integer> streamToShardCount : streamsToShardCount.entrySet()) {
@@ -219,10 +219,10 @@ public class FakeKinesisBehavioursFactory {
 				if (shardCount == 0) {
 					// don't do anything
 				} else {
-					List<KinesisStreamShard> shardsOfStream = new ArrayList<>(shardCount);
+					List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount);
 					for (int i=0; i < shardCount; i++) {
 						shardsOfStream.add(
-							new KinesisStreamShard(
+							new StreamShardHandle(
 								streamName,
 								new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))));
 					}
@@ -234,13 +234,13 @@ public class FakeKinesisBehavioursFactory {
 		@Override
 		public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
 			GetShardListResult result = new GetShardListResult();
-			for (Map.Entry<String, List<KinesisStreamShard>> streamsWithShards : streamsWithListOfShards.entrySet()) {
+			for (Map.Entry<String, List<StreamShardHandle>> streamsWithShards : streamsWithListOfShards.entrySet()) {
 				String streamName = streamsWithShards.getKey();
-				for (KinesisStreamShard shard : streamsWithShards.getValue()) {
+				for (StreamShardHandle shard : streamsWithShards.getValue()) {
 					if (streamNamesWithLastSeenShardIds.get(streamName) == null) {
 						result.addRetrievedShardToStream(streamName, shard);
 					} else {
-						if (KinesisStreamShard.compareShardIds(
+						if (StreamShardHandle.compareShardIds(
 							shard.getShard().getShardId(), streamNamesWithLastSeenShardIds.get(streamName)) > 0) {
 							result.addRetrievedShardToStream(streamName, shard);
 						}
@@ -251,7 +251,7 @@ public class FakeKinesisBehavioursFactory {
 		}
 
 		@Override
-		public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
+		public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
 			return null;
 		}
 


[7/8] flink git commit: [FLINK-6653] Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

Posted by tz...@apache.org.
[FLINK-6653] Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64ca1aa5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64ca1aa5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64ca1aa5

Branch: refs/heads/release-1.3
Commit: 64ca1aa5909d8d4e60a847679accee8d9aeb24e2
Parents: 4ae040c
Author: Tony Wei <to...@gmail.com>
Authored: Thu May 25 10:39:22 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:33:42 2017 +0800

----------------------------------------------------------------------
 .../kinesis/FlinkKinesisConsumer.java           |  71 +++++--
 .../kinesis/internals/KinesisDataFetcher.java   |  92 +++++++--
 .../kinesis/internals/ShardConsumer.java        |   8 +-
 .../kinesis/model/KinesisStreamShard.java       |   5 +-
 .../kinesis/model/KinesisStreamShardState.java  |  21 +-
 .../kinesis/model/KinesisStreamShardV2.java     | 171 ++++++++++++++++
 .../kinesis/model/StreamShardHandle.java        | 129 ++++++++++++
 .../kinesis/proxy/GetShardListResult.java       |  16 +-
 .../connectors/kinesis/proxy/KinesisProxy.java  |  12 +-
 .../kinesis/proxy/KinesisProxyInterface.java    |   4 +-
 .../FlinkKinesisConsumerMigrationTest.java      |   7 +-
 .../kinesis/FlinkKinesisConsumerTest.java       | 205 ++++++++++++-------
 .../internals/KinesisDataFetcherTest.java       | 111 +++++++---
 .../kinesis/internals/ShardConsumerTest.java    |  16 +-
 .../testutils/FakeKinesisBehavioursFactory.java |  22 +-
 15 files changed, 698 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index 4982f7f..b7f5506 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -38,6 +38,8 @@ import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
@@ -98,7 +100,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	private transient KinesisDataFetcher<T> fetcher;
 
 	/** The sequence numbers to restore to upon restore from failure */
-	private transient HashMap<KinesisStreamShard, SequenceNumber> sequenceNumsToRestore;
+	private transient HashMap<KinesisStreamShardV2, SequenceNumber> sequenceNumsToRestore;
 
 	private volatile boolean running = true;
 
@@ -109,7 +111,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	/** State name to access shard sequence number states; cannot be changed */
 	private static final String sequenceNumsStateStoreName = "Kinesis-Stream-Shard-State";
 
-	private transient ListState<Tuple2<KinesisStreamShard, SequenceNumber>> sequenceNumsStateForCheckpoint;
+	private transient ListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> sequenceNumsStateForCheckpoint;
 
 	// ------------------------------------------------------------------------
 	//  Constructors
@@ -197,25 +199,26 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		KinesisDataFetcher<T> fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);
 
 		// initial discovery
-		List<KinesisStreamShard> allShards = fetcher.discoverNewShardsToSubscribe();
+		List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();
 
-		for (KinesisStreamShard shard : allShards) {
+		for (StreamShardHandle shard : allShards) {
+			KinesisStreamShardV2 kinesisStreamShard = KinesisDataFetcher.createKinesisStreamShardV2(shard);
 			if (sequenceNumsToRestore != null) {
-				if (sequenceNumsToRestore.containsKey(shard)) {
+				if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
 					// if the shard was already seen and is contained in the state,
 					// just use the sequence number stored in the state
 					fetcher.registerNewSubscribedShardState(
-						new KinesisStreamShardState(shard, sequenceNumsToRestore.get(shard)));
+						new KinesisStreamShardState(kinesisStreamShard, shard, sequenceNumsToRestore.get(kinesisStreamShard)));
 
 					if (LOG.isInfoEnabled()) {
 						LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
 								" starting state set to the restored sequence number {}",
-							getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(shard));
+							getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(kinesisStreamShard));
 					}
 				} else {
 					// the shard wasn't discovered in the previous run, therefore should be consumed from the beginning
 					fetcher.registerNewSubscribedShardState(
-						new KinesisStreamShardState(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
+						new KinesisStreamShardState(kinesisStreamShard, shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
 
 					if (LOG.isInfoEnabled()) {
 						LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}," +
@@ -231,7 +234,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 						ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();
 
 				fetcher.registerNewSubscribedShardState(
-					new KinesisStreamShardState(shard, startingSeqNum.get()));
+					new KinesisStreamShardState(kinesisStreamShard, shard, startingSeqNum.get()));
 
 				if (LOG.isInfoEnabled()) {
 					LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
@@ -295,8 +298,8 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 
 	@Override
 	public void initializeState(FunctionInitializationContext context) throws Exception {
-		TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
-			TypeInformation.of(KinesisStreamShard.class),
+		TypeInformation<Tuple2<KinesisStreamShardV2, SequenceNumber>> shardsStateTypeInfo = new TupleTypeInfo<>(
+			TypeInformation.of(KinesisStreamShardV2.class),
 			TypeInformation.of(SequenceNumber.class));
 
 		sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState(
@@ -305,7 +308,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		if (context.isRestored()) {
 			if (sequenceNumsToRestore == null) {
 				sequenceNumsToRestore = new HashMap<>();
-				for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
+				for (Tuple2<KinesisStreamShardV2, SequenceNumber> kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) {
 					sequenceNumsToRestore.put(kinesisSequenceNumber.f0, kinesisSequenceNumber.f1);
 				}
 
@@ -330,12 +333,12 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 
 			if (fetcher == null) {
 				if (sequenceNumsToRestore != null) {
-					for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
+					for (Map.Entry<KinesisStreamShardV2, SequenceNumber> entry : sequenceNumsToRestore.entrySet()) {
 						// sequenceNumsToRestore is the restored global union state;
 						// should only snapshot shards that actually belong to us
 
 						if (KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(
-								entry.getKey(),
+								KinesisDataFetcher.createStreamShardHandle(entry.getKey()),
 								getRuntimeContext().getNumberOfParallelSubtasks(),
 								getRuntimeContext().getIndexOfThisSubtask())) {
 
@@ -344,14 +347,14 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 					}
 				}
 			} else {
-				HashMap<KinesisStreamShard, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();
+				HashMap<KinesisStreamShardV2, SequenceNumber> lastStateSnapshot = fetcher.snapshotState();
 
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
 						lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp());
 				}
 
-				for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
+				for (Map.Entry<KinesisStreamShardV2, SequenceNumber> entry : lastStateSnapshot.entrySet()) {
 					sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
 				}
 			}
@@ -363,7 +366,14 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 		LOG.info("Subtask {} restoring offsets from an older Flink version: {}",
 			getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore);
 
-		sequenceNumsToRestore = restoredState.isEmpty() ? null : restoredState;
+		if (restoredState.isEmpty()) {
+			sequenceNumsToRestore = null;
+		} else {
+			sequenceNumsToRestore = new HashMap<>();
+			for (Map.Entry<KinesisStreamShard, SequenceNumber> kv: restoredState.entrySet()) {
+				sequenceNumsToRestore.put(createKinesisStreamShardV2(kv.getKey()), kv.getValue());
+			}
+		}
 	}
 
 	/** This method is exposed for tests that need to mock the KinesisDataFetcher in the consumer. */
@@ -378,7 +388,32 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	}
 
 	@VisibleForTesting
-	HashMap<KinesisStreamShard, SequenceNumber> getRestoredState() {
+	HashMap<KinesisStreamShardV2, SequenceNumber> getRestoredState() {
 		return sequenceNumsToRestore;
 	}
+
+	/**
+	 * Utility function to convert {@link KinesisStreamShard} into {@link KinesisStreamShardV2}
+	 *
+	 * @param kinesisStreamShard the {@link KinesisStreamShard} to be converted
+	 * @return a {@link KinesisStreamShardV2} object
+	 */
+	public static KinesisStreamShardV2 createKinesisStreamShardV2(KinesisStreamShard kinesisStreamShard) {
+		KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
+
+		kinesisStreamShardV2.setStreamName(kinesisStreamShard.getStreamName());
+		kinesisStreamShardV2.setShardId(kinesisStreamShard.getShard().getShardId());
+		kinesisStreamShardV2.setParentShardId(kinesisStreamShard.getShard().getParentShardId());
+		kinesisStreamShardV2.setAdjacentParentShardId(kinesisStreamShard.getShard().getAdjacentParentShardId());
+		if (kinesisStreamShard.getShard().getHashKeyRange() != null) {
+			kinesisStreamShardV2.setStartingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getStartingHashKey());
+			kinesisStreamShardV2.setEndingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getEndingHashKey());
+		}
+		if (kinesisStreamShard.getShard().getSequenceNumberRange() != null) {
+			kinesisStreamShardV2.setStartingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getStartingSequenceNumber());
+			kinesisStreamShardV2.setEndingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getEndingSequenceNumber());
+		}
+
+		return kinesisStreamShardV2;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 99305cb..b0dceec 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -17,13 +17,17 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -259,7 +263,7 @@ public class KinesisDataFetcher<T> {
 
 				if (LOG.isInfoEnabled()) {
 					LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}",
-						indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(),
+						indexOfThisConsumerSubtask, seededShardState.getStreamShardHandle().toString(),
 						seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
 					}
 
@@ -267,7 +271,7 @@ public class KinesisDataFetcher<T> {
 					new ShardConsumer<>(
 						this,
 						seededStateIndex,
-						subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
+						subscribedShardsState.get(seededStateIndex).getStreamShardHandle(),
 						subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
 			}
 		}
@@ -293,19 +297,19 @@ public class KinesisDataFetcher<T> {
 				LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...",
 					indexOfThisConsumerSubtask);
 			}
-			List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe();
+			List<StreamShardHandle> newShardsDueToResharding = discoverNewShardsToSubscribe();
 
-			for (KinesisStreamShard shard : newShardsDueToResharding) {
+			for (StreamShardHandle shard : newShardsDueToResharding) {
 				// since there may be delay in discovering a new shard, all new shards due to
 				// resharding should be read starting from the earliest record possible
 				KinesisStreamShardState newShardState =
-					new KinesisStreamShardState(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
+					new KinesisStreamShardState(createKinesisStreamShardV2(shard), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
 				int newStateIndex = registerNewSubscribedShardState(newShardState);
 
 				if (LOG.isInfoEnabled()) {
 					LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming " +
 							"the shard from sequence number {} with ShardConsumer {}",
-						indexOfThisConsumerSubtask, newShardState.getKinesisStreamShard().toString(),
+						indexOfThisConsumerSubtask, newShardState.getStreamShardHandle().toString(),
 						newShardState.getLastProcessedSequenceNum(), newStateIndex);
 				}
 
@@ -313,7 +317,7 @@ public class KinesisDataFetcher<T> {
 					new ShardConsumer<>(
 						this,
 						newStateIndex,
-						newShardState.getKinesisStreamShard(),
+						newShardState.getStreamShardHandle(),
 						newShardState.getLastProcessedSequenceNum()));
 			}
 
@@ -349,11 +353,11 @@ public class KinesisDataFetcher<T> {
 	 *
 	 * @return state snapshot
 	 */
-	public HashMap<KinesisStreamShard, SequenceNumber> snapshotState() {
+	public HashMap<KinesisStreamShardV2, SequenceNumber> snapshotState() {
 		// this method assumes that the checkpoint lock is held
 		assert Thread.holdsLock(checkpointLock);
 
-		HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new HashMap<>();
+		HashMap<KinesisStreamShardV2, SequenceNumber> stateSnapshot = new HashMap<>();
 		for (KinesisStreamShardState shardWithState : subscribedShardsState) {
 			stateSnapshot.put(shardWithState.getKinesisStreamShard(), shardWithState.getLastProcessedSequenceNum());
 		}
@@ -405,7 +409,7 @@ public class KinesisDataFetcher<T> {
 		if (lastSeenShardIdOfStream == null) {
 			// if not previously set, simply put as the last seen shard id
 			this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
-		} else if (KinesisStreamShard.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) {
+		} else if (StreamShardHandle.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) {
 			this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
 		}
 	}
@@ -419,17 +423,17 @@ public class KinesisDataFetcher<T> {
 	 * 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards
 	 *    that we have already seen before the next time this function is called
 	 */
-	public List<KinesisStreamShard> discoverNewShardsToSubscribe() throws InterruptedException {
+	public List<StreamShardHandle> discoverNewShardsToSubscribe() throws InterruptedException {
 
-		List<KinesisStreamShard> newShardsToSubscribe = new LinkedList<>();
+		List<StreamShardHandle> newShardsToSubscribe = new LinkedList<>();
 
 		GetShardListResult shardListResult = kinesis.getShardList(subscribedStreamsToLastDiscoveredShardIds);
 		if (shardListResult.hasRetrievedShards()) {
 			Set<String> streamsWithNewShards = shardListResult.getStreamsWithRetrievedShards();
 
 			for (String stream : streamsWithNewShards) {
-				List<KinesisStreamShard> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream);
-				for (KinesisStreamShard newShard : newShardsOfStream) {
+				List<StreamShardHandle> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream);
+				for (StreamShardHandle newShard : newShardsOfStream) {
 					if (isThisSubtaskShouldSubscribeTo(newShard, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) {
 						newShardsToSubscribe.add(newShard);
 					}
@@ -502,7 +506,7 @@ public class KinesisDataFetcher<T> {
 			// we've finished reading the shard and should determine it to be non-active
 			if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
 				LOG.info("Subtask {} has reached the end of subscribed shard: {}",
-					indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
+					indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getStreamShardHandle());
 
 				// check if we need to mark the source as idle;
 				// note that on resharding, if registerNewSubscribedShardState was invoked for newly discovered shards
@@ -549,7 +553,7 @@ public class KinesisDataFetcher<T> {
 	 * @param totalNumberOfConsumerSubtasks total number of consumer subtasks
 	 * @param indexOfThisConsumerSubtask index of this consumer subtask
 	 */
-	public static boolean isThisSubtaskShouldSubscribeTo(KinesisStreamShard shard,
+	public static boolean isThisSubtaskShouldSubscribeTo(StreamShardHandle shard,
 														int totalNumberOfConsumerSubtasks,
 														int indexOfThisConsumerSubtask) {
 		return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask;
@@ -582,4 +586,58 @@ public class KinesisDataFetcher<T> {
 		}
 		return initial;
 	}
+
+	/**
+	 * Utility function to convert {@link StreamShardHandle} into {@link KinesisStreamShardV2}
+	 *
+	 * @param streamShardHandle the {@link StreamShardHandle} to be converted
+	 * @return a {@link KinesisStreamShardV2} object
+	 */
+	public static KinesisStreamShardV2 createKinesisStreamShardV2(StreamShardHandle streamShardHandle) {
+		KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
+
+		kinesisStreamShardV2.setStreamName(streamShardHandle.getStreamName());
+		kinesisStreamShardV2.setShardId(streamShardHandle.getShard().getShardId());
+		kinesisStreamShardV2.setParentShardId(streamShardHandle.getShard().getParentShardId());
+		kinesisStreamShardV2.setAdjacentParentShardId(streamShardHandle.getShard().getAdjacentParentShardId());
+		if (streamShardHandle.getShard().getHashKeyRange() != null) {
+			kinesisStreamShardV2.setStartingHashKey(streamShardHandle.getShard().getHashKeyRange().getStartingHashKey());
+			kinesisStreamShardV2.setEndingHashKey(streamShardHandle.getShard().getHashKeyRange().getEndingHashKey());
+		}
+		if (streamShardHandle.getShard().getSequenceNumberRange() != null) {
+			kinesisStreamShardV2.setStartingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getStartingSequenceNumber());
+			kinesisStreamShardV2.setEndingSequenceNumber(streamShardHandle.getShard().getSequenceNumberRange().getEndingSequenceNumber());
+		}
+
+		return kinesisStreamShardV2;
+	}
+
+	/**
+	 * Utility function to convert {@link KinesisStreamShardV2} into {@link StreamShardHandle}
+	 *
+	 * @param kinesisStreamShard the {@link KinesisStreamShardV2} to be converted
+	 * @return a {@link StreamShardHandle} object
+	 */
+	public static StreamShardHandle createStreamShardHandle(KinesisStreamShardV2 kinesisStreamShard) {
+		Shard shard = new Shard();
+		shard.withShardId(kinesisStreamShard.getShardId());
+		shard.withParentShardId(kinesisStreamShard.getParentShardId());
+		shard.withAdjacentParentShardId(kinesisStreamShard.getAdjacentParentShardId());
+
+		if (kinesisStreamShard.getStartingHashKey() != null && kinesisStreamShard.getEndingHashKey() != null) {
+			HashKeyRange hashKeyRange = new HashKeyRange();
+			hashKeyRange.withStartingHashKey(kinesisStreamShard.getStartingHashKey());
+			hashKeyRange.withEndingHashKey(kinesisStreamShard.getEndingHashKey());
+			shard.withHashKeyRange(hashKeyRange);
+		}
+
+		if (kinesisStreamShard.getStartingSequenceNumber() != null && kinesisStreamShard.getEndingSequenceNumber() != null) {
+			SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
+			sequenceNumberRange.withStartingSequenceNumber(kinesisStreamShard.getStartingSequenceNumber());
+			sequenceNumberRange.withEndingSequenceNumber(kinesisStreamShard.getEndingSequenceNumber());
+			shard.withSequenceNumberRange(sequenceNumberRange);
+		}
+
+		return new StreamShardHandle(kinesisStreamShard.getStreamName(), shard);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index ca85854..a724b49 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -24,7 +24,7 @@ import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -60,7 +60,7 @@ public class ShardConsumer<T> implements Runnable {
 
 	private final KinesisDataFetcher<T> fetcherRef;
 
-	private final KinesisStreamShard subscribedShard;
+	private final StreamShardHandle subscribedShard;
 
 	private final int maxNumberOfRecordsPerFetch;
 	private final long fetchIntervalMillis;
@@ -79,7 +79,7 @@ public class ShardConsumer<T> implements Runnable {
 	 */
 	public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
 						Integer subscribedShardStateIndex,
-						KinesisStreamShard subscribedShard,
+						StreamShardHandle subscribedShard,
 						SequenceNumber lastSequenceNum) {
 		this(fetcherRef,
 			subscribedShardStateIndex,
@@ -91,7 +91,7 @@ public class ShardConsumer<T> implements Runnable {
 	/** This constructor is exposed for testing purposes */
 	protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
 							Integer subscribedShardStateIndex,
-							KinesisStreamShard subscribedShard,
+							StreamShardHandle subscribedShard,
 							SequenceNumber lastSequenceNum,
 							KinesisProxyInterface kinesis) {
 		this.fetcherRef = checkNotNull(fetcherRef);

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
index 53ed11b..f3dcfe1 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
@@ -24,9 +24,8 @@ import java.io.Serializable;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
- * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to
- * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges.
+ * A legacy serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
+ * provided along with {@link com.amazonaws.services.kinesis.model.Shard}.
  */
 public class KinesisStreamShard implements Serializable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
index 00181da..e68129d 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
@@ -18,22 +18,28 @@
 package org.apache.flink.streaming.connectors.kinesis.model;
 
 /**
- * A wrapper class that bundles a {@link KinesisStreamShard} with its last processed sequence number.
+ * A wrapper class that bundles a {@link StreamShardHandle} with its last processed sequence number.
  */
 public class KinesisStreamShardState {
 
-	private KinesisStreamShard kinesisStreamShard;
+	private KinesisStreamShardV2 kinesisStreamShard;
+	private StreamShardHandle streamShardHandle;
 	private SequenceNumber lastProcessedSequenceNum;
 
-	public KinesisStreamShardState(KinesisStreamShard kinesisStreamShard, SequenceNumber lastProcessedSequenceNum) {
+	public KinesisStreamShardState(KinesisStreamShardV2 kinesisStreamShard, StreamShardHandle streamShardHandle, SequenceNumber lastProcessedSequenceNum) {
 		this.kinesisStreamShard = kinesisStreamShard;
+		this.streamShardHandle = streamShardHandle;
 		this.lastProcessedSequenceNum = lastProcessedSequenceNum;
 	}
 
-	public KinesisStreamShard getKinesisStreamShard() {
+	public KinesisStreamShardV2 getKinesisStreamShard() {
 		return this.kinesisStreamShard;
 	}
 
+	public StreamShardHandle getStreamShardHandle() {
+		return this.streamShardHandle;
+	}
+
 	public SequenceNumber getLastProcessedSequenceNum() {
 		return this.lastProcessedSequenceNum;
 	}
@@ -46,6 +52,7 @@ public class KinesisStreamShardState {
 	public String toString() {
 		return "KinesisStreamShardState{" +
 			"kinesisStreamShard='" + kinesisStreamShard.toString() + "'" +
+			", streamShardHandle='" + streamShardHandle.toString() + "'" +
 			", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}";
 	}
 
@@ -61,11 +68,13 @@ public class KinesisStreamShardState {
 
 		KinesisStreamShardState other = (KinesisStreamShardState) obj;
 
-		return kinesisStreamShard.equals(other.getKinesisStreamShard()) && lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
+		return kinesisStreamShard.equals(other.getKinesisStreamShard()) &&
+			streamShardHandle.equals(other.getStreamShardHandle()) &&
+			lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
 	}
 
 	@Override
 	public int hashCode() {
-		return 37 * (kinesisStreamShard.hashCode() + lastProcessedSequenceNum.hashCode());
+		return 37 * (kinesisStreamShard.hashCode() + streamShardHandle.hashCode() + lastProcessedSequenceNum.hashCode());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
new file mode 100644
index 0000000..71cb6fa
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardV2.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
+ * disintegrating from {@link com.amazonaws.services.kinesis.model.Shard} and its nested classes.
+ */
+public class KinesisStreamShardV2 implements Serializable {
+
+	private static final long serialVersionUID = 5134869582298563604L;
+
+	private String streamName;
+	private String shardId;
+	private String parentShardId;
+	private String adjacentParentShardId;
+	private String startingHashKey;
+	private String endingHashKey;
+	private String startingSequenceNumber;
+	private String endingSequenceNumber;
+
+	public void setStreamName(String streamName) {
+		this.streamName = streamName;
+	}
+
+	public void setShardId(String shardId) {
+		this.shardId = shardId;
+	}
+
+	public void setParentShardId(String parentShardId) {
+		this.parentShardId = parentShardId;
+	}
+
+	public void setAdjacentParentShardId(String adjacentParentShardId) {
+		this.adjacentParentShardId = adjacentParentShardId;
+	}
+
+	public void setStartingHashKey(String startingHashKey) {
+		this.startingHashKey = startingHashKey;
+	}
+
+	public void setEndingHashKey(String endingHashKey) {
+		this.endingHashKey = endingHashKey;
+	}
+
+	public void setStartingSequenceNumber(String startingSequenceNumber) {
+		this.startingSequenceNumber = startingSequenceNumber;
+	}
+
+	public void setEndingSequenceNumber(String endingSequenceNumber) {
+		this.endingSequenceNumber = endingSequenceNumber;
+	}
+
+	public String getStreamName() {
+		return this.streamName;
+	}
+
+	public String getShardId() {
+		return this.shardId;
+	}
+
+	public String getParentShardId() {
+		return this.parentShardId;
+	}
+
+	public String getAdjacentParentShardId() {
+		return this.adjacentParentShardId;
+	}
+
+	public String getStartingHashKey() {
+		return this.startingHashKey;
+	}
+
+	public String getEndingHashKey() {
+		return this.endingHashKey;
+	}
+
+	public String getStartingSequenceNumber() {
+		return this.startingSequenceNumber;
+	}
+
+	public String getEndingSequenceNumber() {
+		return this.endingSequenceNumber;
+	}
+
+	@Override
+	public String toString() {
+		return "KinesisStreamShardV2{" +
+			"streamName='" + streamName + "'" +
+			", shardId='" + shardId + "'" +
+			", parentShardId='" + parentShardId + "'" +
+			", adjacentParentShardId='" + adjacentParentShardId + "'" +
+			", startingHashKey='" + startingHashKey + "'" +
+			", endingHashKey='" + endingHashKey + "'" +
+			", startingSequenceNumber='" + startingSequenceNumber + "'" +
+			", endingSequenceNumber='" + endingSequenceNumber + "'}";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (!(obj instanceof KinesisStreamShardV2)) {
+			return false;
+		}
+
+		if (obj == this) {
+			return true;
+		}
+
+		KinesisStreamShardV2 other = (KinesisStreamShardV2) obj;
+
+		return streamName.equals(other.getStreamName()) &&
+			shardId.equals(other.getShardId()) &&
+			Objects.equals(parentShardId, other.getParentShardId()) &&
+			Objects.equals(adjacentParentShardId, other.getAdjacentParentShardId()) &&
+			Objects.equals(startingHashKey, other.getStartingHashKey()) &&
+			Objects.equals(endingHashKey, other.getEndingHashKey()) &&
+			Objects.equals(startingSequenceNumber, other.getStartingSequenceNumber()) &&
+			Objects.equals(endingSequenceNumber, other.getEndingSequenceNumber());
+	}
+
+	@Override
+	public int hashCode() {
+		int hash = 17;
+
+		if (streamName != null) {
+			hash = 37 * hash + streamName.hashCode();
+		}
+		if (shardId != null) {
+			hash = 37 * hash + shardId.hashCode();
+		}
+		if (parentShardId != null) {
+			hash = 37 * hash + parentShardId.hashCode();
+		}
+		if (adjacentParentShardId != null) {
+			hash = 37 * hash + adjacentParentShardId.hashCode();
+		}
+		if (startingHashKey != null) {
+			hash = 37 * hash + startingHashKey.hashCode();
+		}
+		if (endingHashKey != null) {
+			hash = 37 * hash + endingHashKey.hashCode();
+		}
+		if (startingSequenceNumber != null) {
+			hash = 37 * hash + startingSequenceNumber.hashCode();
+		}
+		if (endingSequenceNumber != null) {
+			hash = 37 * hash + endingSequenceNumber.hashCode();
+		}
+
+		return hash;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
new file mode 100644
index 0000000..d340a88
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import com.amazonaws.services.kinesis.model.Shard;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A wrapper class around the information provided along with streamName and {@link com.amazonaws.services.kinesis.model.Shard},
+ * with some extra utility methods to determine whether or not a shard is closed and whether or not the shard is
+ * a result of parent shard splits or merges.
+ */
+public class StreamShardHandle {
+
+	private final String streamName;
+	private final Shard shard;
+
+	private final int cachedHash;
+
+	/**
+	 * Create a new StreamShardHandle
+	 *
+	 * @param streamName
+	 *           the name of the Kinesis stream that this shard belongs to
+	 * @param shard
+	 *           the actual AWS Shard instance that will be wrapped within this StreamShardHandle
+	 */
+	public StreamShardHandle(String streamName, Shard shard) {
+		this.streamName = checkNotNull(streamName);
+		this.shard = checkNotNull(shard);
+
+		// since our description of Kinesis Streams shards can be fully defined with the stream name and shard id,
+		// our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation
+		int hash = 17;
+		hash = 37 * hash + streamName.hashCode();
+		hash = 37 * hash + shard.getShardId().hashCode();
+		this.cachedHash = hash;
+	}
+
+	public String getStreamName() {
+		return streamName;
+	}
+
+	public boolean isClosed() {
+		return (shard.getSequenceNumberRange().getEndingSequenceNumber() != null);
+	}
+
+	public Shard getShard() {
+		return shard;
+	}
+
+	@Override
+	public String toString() {
+		return "StreamShardHandle{" +
+			"streamName='" + streamName + "'" +
+			", shard='" + shard.toString() + "'}";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (!(obj instanceof StreamShardHandle)) {
+			return false;
+		}
+
+		if (obj == this) {
+			return true;
+		}
+
+		StreamShardHandle other = (StreamShardHandle) obj;
+
+		return streamName.equals(other.getStreamName()) && shard.equals(other.getShard());
+	}
+
+	@Override
+	public int hashCode() {
+		return cachedHash;
+	}
+
+	/**
+	 * Utility function to compare two shard ids
+	 *
+	 * @param firstShardId first shard id to compare
+	 * @param secondShardId second shard id to compare
+	 * @return a value less than 0 if the first shard id is smaller than the second shard id,
+	 *         or a value larger than 0 the first shard is larger then the second shard id,
+	 *         or 0 if they are equal
+	 */
+	public static int compareShardIds(String firstShardId, String secondShardId) {
+		if (!isValidShardId(firstShardId)) {
+			throw new IllegalArgumentException("The first shard id has invalid format.");
+		}
+
+		if (!isValidShardId(secondShardId)) {
+			throw new IllegalArgumentException("The second shard id has invalid format.");
+		}
+
+		// digit segment of the shard id starts at index 8
+		return Long.compare(Long.parseLong(firstShardId.substring(8)), Long.parseLong(secondShardId.substring(8)));
+	}
+
+	/**
+	 * Checks if a shard id has valid format.
+	 * Kinesis stream shard ids have 12-digit numbers left-padded with 0's,
+	 * prefixed with "shardId-", ex. "shardId-000000000015".
+	 *
+	 * @param shardId the shard id to check
+	 * @return whether the shard id is valid
+	 */
+	public static boolean isValidShardId(String shardId) {
+		if (shardId == null) { return false; }
+		return shardId.matches("^shardId-\\d{12}");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
index 04b1654..aadb31c 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -30,25 +30,25 @@ import java.util.Set;
  */
 public class GetShardListResult {
 
-	private final Map<String, LinkedList<KinesisStreamShard>> streamsToRetrievedShardList = new HashMap<>();
+	private final Map<String, LinkedList<StreamShardHandle>> streamsToRetrievedShardList = new HashMap<>();
 
-	public void addRetrievedShardToStream(String stream, KinesisStreamShard retrievedShard) {
+	public void addRetrievedShardToStream(String stream, StreamShardHandle retrievedShard) {
 		if (!streamsToRetrievedShardList.containsKey(stream)) {
-			streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
+			streamsToRetrievedShardList.put(stream, new LinkedList<StreamShardHandle>());
 		}
 		streamsToRetrievedShardList.get(stream).add(retrievedShard);
 	}
 
-	public void addRetrievedShardsToStream(String stream, List<KinesisStreamShard> retrievedShards) {
+	public void addRetrievedShardsToStream(String stream, List<StreamShardHandle> retrievedShards) {
 		if (retrievedShards.size() != 0) {
 			if (!streamsToRetrievedShardList.containsKey(stream)) {
-				streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
+				streamsToRetrievedShardList.put(stream, new LinkedList<StreamShardHandle>());
 			}
 			streamsToRetrievedShardList.get(stream).addAll(retrievedShards);
 		}
 	}
 
-	public List<KinesisStreamShard> getRetrievedShardListOfStream(String stream) {
+	public List<StreamShardHandle> getRetrievedShardListOfStream(String stream) {
 		if (!streamsToRetrievedShardList.containsKey(stream)) {
 			return null;
 		} else {
@@ -56,7 +56,7 @@ public class GetShardListResult {
 		}
 	}
 
-	public KinesisStreamShard getLastSeenShardOfStream(String stream) {
+	public StreamShardHandle getLastSeenShardOfStream(String stream) {
 		if (!streamsToRetrievedShardList.containsKey(stream)) {
 			return null;
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 580555f..70c1286 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -32,7 +32,7 @@ import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -237,7 +237,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 	 * {@inheritDoc}
 	 */
 	@Override
-	public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
+	public String getShardIterator(StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException {
 		GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest()
 			.withStreamName(shard.getStreamName())
 			.withShardId(shard.getShard().getShardId())
@@ -315,8 +315,8 @@ public class KinesisProxy implements KinesisProxyInterface {
 		}
 	}
 
-	private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
-		List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
+	private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
+		List<StreamShardHandle> shardsOfStream = new ArrayList<>();
 
 		DescribeStreamResult describeStreamResult;
 		do {
@@ -324,7 +324,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 
 			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
 			for (Shard shard : shards) {
-				shardsOfStream.add(new KinesisStreamShard(streamName, shard));
+				shardsOfStream.add(new StreamShardHandle(streamName, shard));
 			}
 
 			if (shards.size() != 0) {
@@ -384,7 +384,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
 			Iterator<Shard> shardItr = shards.iterator();
 			while (shardItr.hasNext()) {
-				if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
+				if (StreamShardHandle.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
 					shardItr.remove();
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
index 9f6d594..807a163 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 
 import java.util.Map;
 
@@ -43,7 +43,7 @@ public interface KinesisProxyInterface {
 	 *                              operation has exceeded the rate limit; this exception will be thrown
 	 *                              if the backoff is interrupted.
 	 */
-	String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) throws InterruptedException;
+	String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) throws InterruptedException;
 
 	/**
 	 * Get the next batch of data records using a specific shard iterator

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index ec9a9b5..d2af6ad 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
@@ -101,9 +102,9 @@ public class FlinkKinesisConsumerMigrationTest {
 		testHarness.open();
 
 		// the expected state in "kafka-consumer-migration-test-flink1.1-snapshot"
-		final HashMap<KinesisStreamShard, SequenceNumber> expectedState = new HashMap<>();
-		expectedState.put(new KinesisStreamShard("fakeStream1",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+		final HashMap<KinesisStreamShardV2, SequenceNumber> expectedState = new HashMap<>();
+		expectedState.put(FlinkKinesisConsumer.createKinesisStreamShardV2(new KinesisStreamShard("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("987654321"));
 
 		// assert that state is correctly restored from legacy checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 4b178c7..760858a 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -17,12 +17,17 @@
 
 package org.apache.flink.streaming.connectors.kinesis;
 
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -36,6 +41,8 @@ import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
@@ -62,6 +69,7 @@ import java.util.UUID;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -109,8 +117,8 @@ public class FlinkKinesisConsumerTest {
 	@Test
 	public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
 		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
-				"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
+		exception.expectMessage("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
+			"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
 
 		Properties testConfig = new Properties();
 		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
@@ -535,28 +543,26 @@ public class FlinkKinesisConsumerTest {
 		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
 		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
-		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
-
-		List<Tuple2<KinesisStreamShard, SequenceNumber>> globalUnionState = new ArrayList<>(4);
+		List<Tuple2<KinesisStreamShardV2, SequenceNumber>> globalUnionState = new ArrayList<>(4);
 		globalUnionState.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("1")));
 		globalUnionState.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
 			new SequenceNumber("1")));
 		globalUnionState.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
 			new SequenceNumber("1")));
 		globalUnionState.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
 			new SequenceNumber("1")));
 
-		TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
-		for (Tuple2<KinesisStreamShard, SequenceNumber> state : globalUnionState) {
+		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : globalUnionState) {
 			listState.add(state);
 		}
 
@@ -566,10 +572,10 @@ public class FlinkKinesisConsumerTest {
 		when(context.getNumberOfParallelSubtasks()).thenReturn(2);
 		consumer.setRuntimeContext(context);
 
+		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
 
 		StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
 		when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
 		when(initializationContext.isRestored()).thenReturn(true);
 
@@ -600,32 +606,32 @@ public class FlinkKinesisConsumerTest {
 		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
 		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
 
-		ArrayList<Tuple2<KinesisStreamShard, SequenceNumber>> initialState = new ArrayList<>(1);
+		ArrayList<Tuple2<KinesisStreamShardV2, SequenceNumber>> initialState = new ArrayList<>(1);
 		initialState.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream1",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("1")));
 
-		ArrayList<Tuple2<KinesisStreamShard, SequenceNumber>> expectedStateSnapshot = new ArrayList<>(3);
+		ArrayList<Tuple2<KinesisStreamShardV2, SequenceNumber>> expectedStateSnapshot = new ArrayList<>(3);
 		expectedStateSnapshot.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream1",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
 			new SequenceNumber("12")));
 		expectedStateSnapshot.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream1",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
 			new SequenceNumber("11")));
 		expectedStateSnapshot.add(Tuple2.of(
-			new KinesisStreamShard("fakeStream1",
-				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+			KinesisDataFetcher.createKinesisStreamShardV2(new StreamShardHandle("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
 			new SequenceNumber("31")));
 
 		// ----------------------------------------------------------------------
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
-		for (Tuple2<KinesisStreamShard, SequenceNumber> state: initialState) {
+		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : initialState) {
 			listState.add(state);
 		}
 
@@ -640,8 +646,8 @@ public class FlinkKinesisConsumerTest {
 		// mock a running fetcher and its state for snapshot
 		// ----------------------------------------------------------------------
 
-		HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new HashMap<>();
-		for (Tuple2<KinesisStreamShard, SequenceNumber> tuple: expectedStateSnapshot) {
+		HashMap<KinesisStreamShardV2, SequenceNumber> stateSnapshot = new HashMap<>();
+		for (Tuple2<KinesisStreamShardV2, SequenceNumber> tuple : expectedStateSnapshot) {
 			stateSnapshot.put(tuple.f0, tuple.f1);
 		}
 
@@ -668,15 +674,15 @@ public class FlinkKinesisConsumerTest {
 		assertEquals(true, listState.clearCalled);
 		assertEquals(3, listState.getList().size());
 
-		for (Tuple2<KinesisStreamShard, SequenceNumber> state: initialState) {
-			for (Tuple2<KinesisStreamShard, SequenceNumber> currentState: listState.getList()) {
+		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : initialState) {
+			for (Tuple2<KinesisStreamShardV2, SequenceNumber> currentState : listState.getList()) {
 				assertNotEquals(state, currentState);
 			}
 		}
 
-		for (Tuple2<KinesisStreamShard, SequenceNumber> state: expectedStateSnapshot) {
+		for (Tuple2<KinesisStreamShardV2, SequenceNumber> state : expectedStateSnapshot) {
 			boolean hasOneIsSame = false;
-			for (Tuple2<KinesisStreamShard, SequenceNumber> currentState: listState.getList()) {
+			for (Tuple2<KinesisStreamShardV2, SequenceNumber> currentState : listState.getList()) {
 				hasOneIsSame = hasOneIsSame || state.equals(currentState);
 			}
 			assertEquals(true, hasOneIsSame);
@@ -706,10 +712,14 @@ public class FlinkKinesisConsumerTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testFetcherShouldBeCorrectlySeededIfRestoringFromLegacyCheckpoint() throws Exception {
-		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
+		HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
+		HashMap<KinesisStreamShard, SequenceNumber> legacyFakeRestoredState = new HashMap<>();
+		for (Map.Entry<StreamShardHandle, SequenceNumber> kv : fakeRestoredState.entrySet()) {
+			legacyFakeRestoredState.put(new KinesisStreamShard(kv.getKey().getStreamName(), kv.getKey().getShard()), kv.getValue());
+		}
 
 		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
-		List<KinesisStreamShard> shards = new ArrayList<>();
+		List<StreamShardHandle> shards = new ArrayList<>();
 		shards.addAll(fakeRestoredState.keySet());
 		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
 		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
@@ -720,13 +730,14 @@ public class FlinkKinesisConsumerTest {
 
 		TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
 			"fakeStream", new Properties(), 10, 2);
-		consumer.restoreState(fakeRestoredState);
+		consumer.restoreState(legacyFakeRestoredState);
 		consumer.open(new Configuration());
 		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
 
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
 
@@ -738,15 +749,15 @@ public class FlinkKinesisConsumerTest {
 		// setup initial state
 		// ----------------------------------------------------------------------
 
-		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
+		HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
 
 		// ----------------------------------------------------------------------
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) {
-			listState.add(Tuple2.of(state.getKey(), state.getValue()));
+		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
+			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
 		}
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -761,7 +772,7 @@ public class FlinkKinesisConsumerTest {
 		// ----------------------------------------------------------------------
 
 		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
-		List<KinesisStreamShard> shards = new ArrayList<>();
+		List<StreamShardHandle> shards = new ArrayList<>();
 		shards.addAll(fakeRestoredState.keySet());
 		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
 		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
@@ -780,9 +791,10 @@ public class FlinkKinesisConsumerTest {
 		consumer.open(new Configuration());
 		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
 
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
 
@@ -794,20 +806,20 @@ public class FlinkKinesisConsumerTest {
 		// setup initial state
 		// ----------------------------------------------------------------------
 
-		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("fakeStream1");
+		HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("fakeStream1");
 
-		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2");
+		HashMap<StreamShardHandle, SequenceNumber> fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2");
 
 		// ----------------------------------------------------------------------
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) {
-			listState.add(Tuple2.of(state.getKey(), state.getValue()));
+		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
+			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
 		}
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredStateForOthers.entrySet()) {
-			listState.add(Tuple2.of(state.getKey(), state.getValue()));
+		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredStateForOthers.entrySet()) {
+			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
 		}
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -822,7 +834,7 @@ public class FlinkKinesisConsumerTest {
 		// ----------------------------------------------------------------------
 
 		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
-		List<KinesisStreamShard> shards = new ArrayList<>();
+		List<StreamShardHandle> shards = new ArrayList<>();
 		shards.addAll(fakeRestoredState.keySet());
 		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
 		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
@@ -841,15 +853,17 @@ public class FlinkKinesisConsumerTest {
 		consumer.open(new Configuration());
 		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
 
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet()) {
+		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet()) {
 			// should never get restored state not belonging to itself
 			Mockito.verify(mockedFetcher, never()).registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+					restoredShard.getKey(), restoredShard.getValue()));
 		}
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			// should get restored state belonging to itself
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
 
@@ -890,15 +904,15 @@ public class FlinkKinesisConsumerTest {
 		// setup initial state
 		// ----------------------------------------------------------------------
 
-		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
+		HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
 
 		// ----------------------------------------------------------------------
 		// mock operator state backend and initial state for initializeState()
 		// ----------------------------------------------------------------------
 
-		TestingListState<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new TestingListState<>();
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> state: fakeRestoredState.entrySet()) {
-			listState.add(Tuple2.of(state.getKey(), state.getValue()));
+		TestingListState<Tuple2<KinesisStreamShardV2, SequenceNumber>> listState = new TestingListState<>();
+		for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
+			listState.add(Tuple2.of(KinesisDataFetcher.createKinesisStreamShardV2(state.getKey()), state.getValue()));
 		}
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
@@ -913,9 +927,9 @@ public class FlinkKinesisConsumerTest {
 		// ----------------------------------------------------------------------
 
 		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
-		List<KinesisStreamShard> shards = new ArrayList<>();
+		List<StreamShardHandle> shards = new ArrayList<>();
 		shards.addAll(fakeRestoredState.keySet());
-		shards.add(new KinesisStreamShard("fakeStream2",
+		shards.add(new StreamShardHandle("fakeStream2",
 			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
 		when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
 		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
@@ -934,15 +948,58 @@ public class FlinkKinesisConsumerTest {
 		consumer.open(new Configuration());
 		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
 
-		fakeRestoredState.put(new KinesisStreamShard("fakeStream2",
+		fakeRestoredState.put(new StreamShardHandle("fakeStream2",
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
 			SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
-		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+		for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
 			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
-				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+				new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredShard.getKey()),
+					restoredShard.getKey(), restoredShard.getValue()));
 		}
 	}
 
+	@Test
+	public void testCreateFunctionToConvertBetweenKinesisStreamShardAndKinesisStreamShardV2() {
+		String streamName = "fakeStream1";
+		String shardId = "shard-000001";
+		String parentShardId = "shard-000002";
+		String adjacentParentShardId = "shard-000003";
+		String startingHashKey = "key-000001";
+		String endingHashKey = "key-000010";
+		String startingSequenceNumber = "seq-0000021";
+		String endingSequenceNumber = "seq-00000031";
+
+		KinesisStreamShardV2 kinesisStreamShardV2 = new KinesisStreamShardV2();
+		kinesisStreamShardV2.setStreamName(streamName);
+		kinesisStreamShardV2.setShardId(shardId);
+		kinesisStreamShardV2.setParentShardId(parentShardId);
+		kinesisStreamShardV2.setAdjacentParentShardId(adjacentParentShardId);
+		kinesisStreamShardV2.setStartingHashKey(startingHashKey);
+		kinesisStreamShardV2.setEndingHashKey(endingHashKey);
+		kinesisStreamShardV2.setStartingSequenceNumber(startingSequenceNumber);
+		kinesisStreamShardV2.setEndingSequenceNumber(endingSequenceNumber);
+
+		Shard shard = new Shard()
+			.withShardId(shardId)
+			.withParentShardId(parentShardId)
+			.withAdjacentParentShardId(adjacentParentShardId)
+			.withHashKeyRange(new HashKeyRange()
+				.withStartingHashKey(startingHashKey)
+				.withEndingHashKey(endingHashKey))
+			.withSequenceNumberRange(new SequenceNumberRange()
+				.withStartingSequenceNumber(startingSequenceNumber)
+				.withEndingSequenceNumber(endingSequenceNumber));
+		KinesisStreamShard kinesisStreamShard = new KinesisStreamShard(streamName, shard);
+
+		assertEquals(kinesisStreamShardV2, FlinkKinesisConsumer.createKinesisStreamShardV2(kinesisStreamShard));
+	}
+
+	@Test
+	public void testKinesisStreamShardV2WillUsePojoSerializer() {
+		TypeInformation<KinesisStreamShardV2> typeInformation = TypeInformation.of(KinesisStreamShardV2.class);
+		assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
+	}
+
 	private static final class TestingListState<T> implements ListState<T> {
 
 		private final List<T> list = new ArrayList<>();
@@ -973,31 +1030,31 @@ public class FlinkKinesisConsumerTest {
 		}
 	}
 
-	private HashMap<KinesisStreamShard, SequenceNumber> getFakeRestoredStore(String streamName) {
-		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>();
+	private HashMap<StreamShardHandle, SequenceNumber> getFakeRestoredStore(String streamName) {
+		HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = new HashMap<>();
 
 		if (streamName.equals("fakeStream1") || streamName.equals("all")) {
 			fakeRestoredState.put(
-				new KinesisStreamShard("fakeStream1",
+				new StreamShardHandle("fakeStream1",
 					new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 				new SequenceNumber(UUID.randomUUID().toString()));
 			fakeRestoredState.put(
-				new KinesisStreamShard("fakeStream1",
+				new StreamShardHandle("fakeStream1",
 					new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 				new SequenceNumber(UUID.randomUUID().toString()));
 			fakeRestoredState.put(
-				new KinesisStreamShard("fakeStream1",
+				new StreamShardHandle("fakeStream1",
 					new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
 				new SequenceNumber(UUID.randomUUID().toString()));
 		}
 
 		if (streamName.equals("fakeStream2") || streamName.equals("all")) {
 			fakeRestoredState.put(
-				new KinesisStreamShard("fakeStream2",
+				new StreamShardHandle("fakeStream2",
 					new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
 				new SequenceNumber(UUID.randomUUID().toString()));
 			fakeRestoredState.put(
-				new KinesisStreamShard("fakeStream2",
+				new StreamShardHandle("fakeStream2",
 					new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 				new SequenceNumber(UUID.randomUUID().toString()));
 		}


[4/8] flink git commit: [FLINK-6708] [yarn] Minor improvements to YARN session HA fixes

Posted by tz...@apache.org.
[FLINK-6708] [yarn] Minor improvements to YARN session HA fixes

This closes #3981.
This closes #3982.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e138f10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e138f10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e138f10

Branch: refs/heads/release-1.3
Commit: 2e138f1009b30c91aa73a704cc175f9e61ca52ea
Parents: 9bc34bf
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 26 15:00:47 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:33:20 2017 +0800

----------------------------------------------------------------------
 .../apache/flink/yarn/AbstractYarnClusterDescriptor.java  |  6 +++++-
 .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java    |  2 +-
 .../main/scala/org/apache/flink/yarn/YarnJobManager.scala | 10 +++++++---
 3 files changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2e138f10/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 818a3e8..044d1e7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -1235,7 +1235,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			LOG.info("Deleting files in {}.", yarnFilesDir);
 			try {
 				FileSystem fs = FileSystem.get(conf);
-				fs.delete(yarnFilesDir, true);
+
+				if (!fs.delete(yarnFilesDir, true)) {
+					throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
+				}
+
 				fs.close();
 			} catch (IOException e) {
 				LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/2e138f10/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index d2a4340..53253d6 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -424,7 +424,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 						numTaskmanagers = status.numRegisteredTaskManagers();
 					}
 				} catch (Exception e) {
-					LOG.warn("Could not retrieve the current cluster status. Retrying...", e);
+					LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval attempt ...", e);
 				}
 
 				List<String> messages = yarnCluster.getNewMessages();

http://git-wip-us.apache.org/repos/asf/flink/blob/2e138f10/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 902553f..e094bb7 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -100,8 +100,8 @@ class YarnJobManager(
     handleYarnShutdown orElse super.handleMessage
   }
 
-  def handleYarnShutdown: Receive = {
-    case msg:StopCluster =>
+  private def handleYarnShutdown: Receive = {
+    case msg: StopCluster =>
       super.handleMessage(msg)
 
       // do global cleanup if the yarn files path has been set
@@ -113,7 +113,11 @@ class YarnJobManager(
 
           try {
             val fs = path.getFileSystem
-            fs.delete(path, true)
+
+            if (!fs.delete(path, true)) {
+              throw new IOException(s"Deleting yarn application files under $filePath " +
+                s"was unsuccessful.")
+            }
           } catch {
             case ioe: IOException =>
               log.warn(


[3/8] flink git commit: [FLINK-6708] [yarn] Harden FlinkYarnSessionCli to handle GetClusterStatusResponse exceptions

Posted by tz...@apache.org.
[FLINK-6708] [yarn] Harden FlinkYarnSessionCli to handle GetClusterStatusResponse exceptions

This PR hardens the FlinkYarnSessionCli by handling exceptions which occur when
retrieving the GetClusterStatusResponse. If no such response is retrieved and instead
an exception is thrown, the Cli won't fail but retry it the next time.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9bc34bfc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9bc34bfc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9bc34bfc

Branch: refs/heads/release-1.3
Commit: 9bc34bfcfe8638eed89ca6063da87148d1152016
Parents: 99e15dd
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 24 18:26:57 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:33:13 2017 +0800

----------------------------------------------------------------------
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java  | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9bc34bfc/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 1ece264..d2a4340 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -413,14 +413,18 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			while (true) {
 				// ------------------ check if there are updates by the cluster -----------
 
-				GetClusterStatusResponse status = yarnCluster.getClusterStatus();
-				LOG.debug("Received status message: {}", status);
+				try {
+					GetClusterStatusResponse status = yarnCluster.getClusterStatus();
+					LOG.debug("Received status message: {}", status);
 
-				if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
-					System.err.println("Number of connected TaskManagers changed to " +
+					if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
+						System.err.println("Number of connected TaskManagers changed to " +
 							status.numRegisteredTaskManagers() + ". " +
-						"Slots available: " + status.totalNumberOfSlots());
-					numTaskmanagers = status.numRegisteredTaskManagers();
+							"Slots available: " + status.totalNumberOfSlots());
+						numTaskmanagers = status.numRegisteredTaskManagers();
+					}
+				} catch (Exception e) {
+					LOG.warn("Could not retrieve the current cluster status. Retrying...", e);
 				}
 
 				List<String> messages = yarnCluster.getNewMessages();


[2/8] flink git commit: [FLINK-6646] [yarn] Let YarnJobManager delete Yarn application files

Posted by tz...@apache.org.
[FLINK-6646] [yarn] Let YarnJobManager delete Yarn application files

Before the YarnClusterClient decided when to delete the Yarn application files.
This is problematic because the client does not know whether a Yarn application
is being restarted or terminated. Due to this the files where always deleted. This
prevents Yarn from restarting a failed ApplicationMaster, effectively thwarting
Flink's HA capabilities.

The PR changes the behaviour such that the YarnJobManager deletes the Yarn files
if it receives a StopCluster message. That way, we can be sure that the yarn files
are deleted only iff the cluster is intended to be shut down.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99e15dd8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99e15dd8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99e15dd8

Branch: refs/heads/release-1.3
Commit: 99e15dd85aa5907dbe6dad6f27179c568179f1a6
Parents: e045423
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 24 17:59:51 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:32:48 2017 +0800

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  2 +-
 ...CliFrontendYarnAddressConfigurationTest.java |  2 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 42 ++++++++------------
 .../apache/flink/yarn/YarnClusterClient.java    | 26 +-----------
 .../org/apache/flink/yarn/YarnConfigKeys.java   |  1 +
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  4 +-
 .../apache/flink/yarn/ApplicationClient.scala   |  2 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  | 35 ++++++++++++++++
 9 files changed, 60 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1ea783b..86d9894 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1201,7 +1201,7 @@ class JobManager(
           }
         case None =>
           // ResourceManager not available
-          // we choose not to wait here beacuse it might block the shutdown forever
+          // we choose not to wait here because it might block the shutdown forever
       }
 
       sender() ! decorateMessage(StopClusterSuccessful.getInstance())

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 6a8c266..2399f47 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.commons.cli.CommandLine;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -390,7 +391,6 @@ public class CliFrontendYarnAddressConfigurationTest {
 						YarnClient yarnClient,
 						ApplicationReport report,
 						Configuration flinkConfiguration,
-						Path sessionFilesDir,
 						boolean perJobCluster) throws IOException, YarnException {
 
 					return Mockito.mock(YarnClusterClient.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 4da5a39..9351682 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -179,7 +179,7 @@ public class FlinkYarnSessionCliTest {
 				Mockito.mock(YarnClient.class),
 				Mockito.mock(ApplicationReport.class),
 				config,
-				new Path("/temp"), false);
+				false);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 3110a5b..818a3e8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -107,12 +108,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	private Configuration conf = new YarnConfiguration();
 
 	/**
-	 * Files (usually in a distributed file system) used for the YARN session of Flink.
-	 * Contains configuration files and jar files.
-	 */
-	private Path sessionFilesDir;
-
-	/**
 	 * If the user has specified a different number of slots, we store them here
 	 */
 	private int slots = -1;
@@ -416,7 +411,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, appReport.getHost());
 			flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, appReport.getRpcPort());
 
-			return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, sessionFilesDir, false);
+			return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
 		} catch (Exception e) {
 			throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
 		}
@@ -583,7 +578,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
 
 		// the Flink cluster is deployed in YARN. Represent cluster
-		return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
+		return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, true);
 	}
 
 	public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication) throws Exception {
@@ -739,10 +734,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
-		sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
+		Path yarnFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId + '/');
 
 		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
-		fs.setPermission(sessionFilesDir, permission); // set permission for path.
+		fs.setPermission(yarnFilesDir, permission); // set permission for path.
 
 		//To support Yarn Secure Integration Test Scenario
 		//In Integration test setup, the Yarn containers created by YarnMiniCluster does not have the Yarn site XML
@@ -812,6 +807,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots));
 		appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
 		appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
+		appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES, yarnFilesDir.toUri().toString());
 
 		// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
 		appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
@@ -863,7 +859,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		setApplicationTags(appContext);
 
 		// add a hook to clean up in case deployment fails
-		Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication);
+		Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
 		Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
 		LOG.info("Submitting application master " + appId);
 		yarnClient.submitApplication(appContext);
@@ -1057,10 +1053,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 	}
 
-	public String getSessionFilesDir() {
-		return sessionFilesDir.toString();
-	}
-
 	public void setName(String name) {
 		if(name == null) {
 			throw new IllegalArgumentException("The passed name is null");
@@ -1226,22 +1218,24 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	private class DeploymentFailureHook extends Thread {
 
-		DeploymentFailureHook(YarnClient yarnClient, YarnClientApplication yarnApplication) {
-			this.yarnClient = yarnClient;
-			this.yarnApplication = yarnApplication;
-		}
+		private final YarnClient yarnClient;
+		private final YarnClientApplication yarnApplication;
+		private final Path yarnFilesDir;
 
-		private YarnClient yarnClient;
-		private YarnClientApplication yarnApplication;
+		DeploymentFailureHook(YarnClient yarnClient, YarnClientApplication yarnApplication, Path yarnFilesDir) {
+			this.yarnClient = Preconditions.checkNotNull(yarnClient);
+			this.yarnApplication = Preconditions.checkNotNull(yarnApplication);
+			this.yarnFilesDir = Preconditions.checkNotNull(yarnFilesDir);
+		}
 
 		@Override
 		public void run() {
 			LOG.info("Cancelling deployment from Deployment Failure Hook");
 			failSessionDuringDeployment(yarnClient, yarnApplication);
-			LOG.info("Deleting files in " + sessionFilesDir);
+			LOG.info("Deleting files in {}.", yarnFilesDir);
 			try {
 				FileSystem fs = FileSystem.get(conf);
-				fs.delete(sessionFilesDir, true);
+				fs.delete(yarnFilesDir, true);
 				fs.close();
 			} catch (IOException e) {
 				LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);
@@ -1348,14 +1342,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			YarnClient yarnClient,
 			ApplicationReport report,
 			org.apache.flink.configuration.Configuration flinkConfiguration,
-			Path sessionFilesDir,
 			boolean perJobCluster) throws Exception {
 		return new YarnClusterClient(
 			descriptor,
 			yarnClient,
 			report,
 			flinkConfiguration,
-			sessionFilesDir,
 			perJobCluster);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 8f47b18..7042f99 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
@@ -38,9 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -74,9 +72,6 @@ public class YarnClusterClient extends ClusterClient {
 
 	private Thread clientShutdownHook = new ClientShutdownHook();
 	private PollingThread pollingRunner;
-	private final Configuration hadoopConfig;
-	// (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown.
-	private final Path sessionFilesDir;
 
 	//---------- Class internal fields -------------------
 
@@ -99,7 +94,6 @@ public class YarnClusterClient extends ClusterClient {
 	 * @param yarnClient Client to talk to YARN
 	 * @param appReport the YARN application ID
 	 * @param flinkConfig Flink configuration
-	 * @param sessionFilesDir Location of files required for YARN session
 	 * @param newlyCreatedCluster Indicator whether this cluster has just been created
 	 * @throws IOException
 	 * @throws YarnException
@@ -108,8 +102,7 @@ public class YarnClusterClient extends ClusterClient {
 		final AbstractYarnClusterDescriptor clusterDescriptor,
 		final YarnClient yarnClient,
 		final ApplicationReport appReport,
-		org.apache.flink.configuration.Configuration flinkConfig,
-		Path sessionFilesDir,
+		Configuration flinkConfig,
 		boolean newlyCreatedCluster) throws Exception {
 
 		super(flinkConfig);
@@ -117,8 +110,6 @@ public class YarnClusterClient extends ClusterClient {
 		this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
 		this.clusterDescriptor = clusterDescriptor;
 		this.yarnClient = yarnClient;
-		this.hadoopConfig = yarnClient.getConfig();
-		this.sessionFilesDir = sessionFilesDir;
 		this.appReport = appReport;
 		this.appId = appReport.getApplicationId();
 		this.trackingURL = appReport.getTrackingUrl();
@@ -391,19 +382,6 @@ public class YarnClusterClient extends ClusterClient {
 			LOG.warn("Exception while deleting the JobManager address file", e);
 		}
 
-		if (sessionFilesDir != null) {
-			LOG.info("Deleting files in " + sessionFilesDir);
-			try {
-				FileSystem shutFS = FileSystem.get(hadoopConfig);
-				shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
-				shutFS.close();
-			} catch (IOException e) {
-				LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e);
-			}
-		} else {
-			LOG.warn("Session file directory not set. Not deleting session files");
-		}
-
 		try {
 			pollingRunner.stopRunner();
 			pollingRunner.join(1000);

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
index ada241c..7c9c7a7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
@@ -39,6 +39,7 @@ public class YarnConfigKeys {
 	public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH";
 
 	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
+	public final static String FLINK_YARN_FILES = "_FLINK_YARN_FILES"; // the root directory for all yarn application files
 
 	public final static String KEYTAB_PATH = "_KEYTAB_PATH";
 	public final static String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 69b472a..1ece264 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -660,9 +660,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 				// print info and quit:
 				LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
 						"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-						"yarn application -kill " + yarnCluster.getApplicationId() + System.lineSeparator() +
-						"Please also note that the temporary files of the YARN session in {} will not be removed.",
-						yarnDescriptor.getSessionFilesDir());
+						"yarn application -kill " + yarnCluster.getApplicationId());
 				yarnCluster.waitForClusterToBeReady();
 				yarnCluster.disconnect();
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index 7442503..35d5f56 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -131,7 +131,7 @@ class ApplicationClient(
       }
 
     case msg: RegisterInfoMessageListenerSuccessful =>
-      // The job manager acts as a proxy between the client and the resource managert
+      // The job manager acts as a proxy between the client and the resource manager
       val jm = sender()
       log.info(s"Successfully registered at the ResourceManager using JobManager $jm")
       yarnJobManager = Some(jm)

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index efb4801..902553f 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -18,12 +18,15 @@
 
 package org.apache.flink.yarn
 
+import java.io.IOException
 import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration}
+import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.ContaineredJobManager
+import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.instance.InstanceManager
@@ -89,5 +92,37 @@ class YarnJobManager(
       flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5),
       TimeUnit.SECONDS)
 
+  val yarnFilesPath: Option[String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES))
+
   override val jobPollingInterval = YARN_HEARTBEAT_DELAY
+
+  override def handleMessage: Receive = {
+    handleYarnShutdown orElse super.handleMessage
+  }
+
+  def handleYarnShutdown: Receive = {
+    case msg:StopCluster =>
+      super.handleMessage(msg)
+
+      // do global cleanup if the yarn files path has been set
+      yarnFilesPath match {
+        case Some(filePath) =>
+          log.info(s"Deleting yarn application files under $filePath.")
+
+          val path = new Path(filePath)
+
+          try {
+            val fs = path.getFileSystem
+            fs.delete(path, true)
+          } catch {
+            case ioe: IOException =>
+              log.warn(
+                s"Could not properly delete yarn application files directory $filePath.",
+                ioe)
+          }
+        case None =>
+          log.debug("No yarn application files directory set. Therefore, cannot clean up " +
+            "the data.")
+      }
+  }
 }


[5/8] flink git commit: [FLINK-6704][yarn] Fix user-jars not being possible to exclude from system class path

Posted by tz...@apache.org.
[FLINK-6704][yarn] Fix user-jars not being possible to exclude from system class path

This closes #3979.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ae040c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ae040c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ae040c1

Branch: refs/heads/release-1.3
Commit: 4ae040c141c1eb675883a3857de9163d22556b89
Parents: 2e138f1
Author: zentol <ch...@apache.org>
Authored: Wed May 24 15:08:57 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:33:29 2017 +0800

----------------------------------------------------------------------
 .../org/apache/flink/yarn/AbstractYarnClusterDescriptor.java | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ae040c1/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 044d1e7..b9a4416 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -671,7 +671,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 		// upload and register ship files	
 		List<String> systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, localResources, envShipFileList);
-		List<String> userClassPaths = uploadAndRegisterFiles(userJarFiles, fs, appId.toString(), paths, localResources, envShipFileList);
+
+		List<String> userClassPaths;
+		if (userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED) {
+			userClassPaths = uploadAndRegisterFiles(userJarFiles, fs, appId.toString(), paths, localResources, envShipFileList);
+		} else {
+			userClassPaths = Collections.emptyList();
+		}
 
 		if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
 			systemClassPaths.addAll(userClassPaths);