You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:16 UTC

[26/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
new file mode 100644
index 0000000..dbf95f9
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Suite of FlinkKinesisConsumer tests for the methods called throughout the source life cycle.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
+public class FlinkKinesisConsumerTest {
+
+	@Rule
+	private ExpectedException exception = ExpectedException.none();
+
+	// ----------------------------------------------------------------------
+	// FlinkKinesisConsumer.validateAwsConfiguration() tests
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testMissingAwsRegionInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnrecognizableAwsRegionInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid AWS region");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "wrongRegionId");
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
+	}
+
+	@Test
+	public void testCredentialProviderTypeDefaultToBasicButNoCredentialsSetInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
+				"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
+	}
+
+	@Test
+	public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
+				"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnrecognizableCredentialProviderTypeInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid AWS Credential Provider Type");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
+		testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		KinesisConfigUtil.validateAwsConfiguration(testConfig);
+	}
+
+	// ----------------------------------------------------------------------
+	// FlinkKinesisConsumer.validateConsumerConfiguration() tests
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testUnrecognizableStreamInitPositionTypeInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid initial position in stream");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableIntForGetRecordsRetriesInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableIntForGetRecordsMaxCountInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForGetRecordsBackoffBaseMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get records operation base backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForGetRecordsBackoffMaxMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get records operation max backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableDoubleForGetRecordsBackoffExponentialConstantInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get records operation backoff exponential constant");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForGetRecordsIntervalMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableIntForGetShardIteratorRetriesInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForGetShardIteratorBackoffBaseMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForGetShardIteratorBackoffMaxMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableDoubleForGetShardIteratorBackoffExponentialConstantInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForShardDiscoveryIntervalMillisInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong");
+
+		KinesisConfigUtil.validateConsumerConfiguration(testConfig);
+	}
+
+	// ----------------------------------------------------------------------
+	// FlinkKinesisConsumer.validateProducerConfiguration() tests
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testUnparsableLongForCollectionMaxCountInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum number of items to pack into a PutRecords request");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "unparsableLong");
+
+		KinesisConfigUtil.validateProducerConfiguration(testConfig);
+	}
+
+	@Test
+	public void testUnparsableLongForAggregationMaxCountInConfig() {
+		exception.expect(IllegalArgumentException.class);
+		exception.expectMessage("Invalid value given for maximum number of items to pack into an aggregated record");
+
+		Properties testConfig = new Properties();
+		testConfig.setProperty(ProducerConfigConstants.AWS_REGION, "us-east-1");
+		testConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		testConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+		testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "unparsableLong");
+
+		KinesisConfigUtil.validateProducerConfiguration(testConfig);
+	}
+
+	// ----------------------------------------------------------------------
+	// Tests related to state initialization
+	// ----------------------------------------------------------------------
+
+	@Test
+	public void testSnapshotStateShouldBeNullIfSourceNotOpened() throws Exception {
+		Properties config = new Properties();
+		config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
+
+		assertTrue(consumer.snapshotState(123, 123) == null); //arbitrary checkpoint id and timestamp
+	}
+
+	@Test
+	public void testSnapshotStateShouldBeNullIfSourceNotRun() throws Exception {
+		Properties config = new Properties();
+		config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+		config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+		config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+		FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
+		consumer.open(new Configuration()); // only opened, not run
+
+		assertTrue(consumer.snapshotState(123, 123) == null); //arbitrary checkpoint id and timestamp
+	}
+
+	// ----------------------------------------------------------------------
+	// Tests related to fetcher initialization
+	// ----------------------------------------------------------------------
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws Exception {
+		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
+		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
+
+		// assume the given config is correct
+		PowerMockito.mockStatic(KinesisConfigUtil.class);
+		PowerMockito.doNothing().when(KinesisConfigUtil.class);
+
+		TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
+			"fakeStream", new Properties(), 10, 2);
+		consumer.open(new Configuration());
+		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+
+		Mockito.verify(mockedFetcher).setIsRestoringFromFailure(false);
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {
+		KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
+		PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
+
+		// assume the given config is correct
+		PowerMockito.mockStatic(KinesisConfigUtil.class);
+		PowerMockito.doNothing().when(KinesisConfigUtil.class);
+
+		HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = new HashMap<>();
+		fakeRestoredState.put(
+			new KinesisStreamShard("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			new SequenceNumber(UUID.randomUUID().toString()));
+		fakeRestoredState.put(
+			new KinesisStreamShard("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			new SequenceNumber(UUID.randomUUID().toString()));
+		fakeRestoredState.put(
+			new KinesisStreamShard("fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+			new SequenceNumber(UUID.randomUUID().toString()));
+		fakeRestoredState.put(
+			new KinesisStreamShard("fakeStream2",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			new SequenceNumber(UUID.randomUUID().toString()));
+		fakeRestoredState.put(
+			new KinesisStreamShard("fakeStream2",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			new SequenceNumber(UUID.randomUUID().toString()));
+
+		TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
+			"fakeStream", new Properties(), 10, 2);
+		consumer.restoreState(fakeRestoredState);
+		consumer.open(new Configuration());
+		consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
+
+		Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
+		for (Map.Entry<KinesisStreamShard, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
+			Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream(
+				restoredShard.getKey().getStreamName(), restoredShard.getKey().getShard().getShardId());
+			Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
+				new KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
new file mode 100644
index 0000000..e79f9b1
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TestableKinesisDataFetcher.class)
+public class KinesisDataFetcherTest {
+
+	@Test(expected = RuntimeException.class)
+	public void testIfNoShardsAreFoundShouldThrowException() throws Exception {
+		List<String> fakeStreams = new LinkedList<>();
+		fakeStreams.add("fakeStream1");
+		fakeStreams.add("fakeStream2");
+
+		HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
+			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
+
+		TestableKinesisDataFetcher fetcher =
+			new TestableKinesisDataFetcher(
+				fakeStreams,
+				new Properties(),
+				10,
+				2,
+				new AtomicReference<Throwable>(),
+				new LinkedList<KinesisStreamShardState>(),
+				subscribedStreamsToLastSeenShardIdsUnderTest,
+				FakeKinesisBehavioursFactory.noShardsFoundForRequestedStreamsBehaviour());
+
+		fetcher.setIsRestoringFromFailure(false); // not restoring
+
+		fetcher.runFetcher(); // this should throw RuntimeException
+	}
+
+	@Test
+	public void testStreamToLastSeenShardStateIsCorrectlySetWhenNotRestoringFromFailure() throws Exception {
+		List<String> fakeStreams = new LinkedList<>();
+		fakeStreams.add("fakeStream1");
+		fakeStreams.add("fakeStream2");
+		fakeStreams.add("fakeStream3");
+		fakeStreams.add("fakeStream4");
+
+		HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
+			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
+
+		Map<String,Integer> streamToShardCount = new HashMap<>();
+		Random rand = new Random();
+		for (String fakeStream : fakeStreams) {
+			streamToShardCount.put(fakeStream, rand.nextInt(5)+1);
+		}
+
+		final TestableKinesisDataFetcher fetcher =
+			new TestableKinesisDataFetcher(
+				fakeStreams,
+				new Properties(),
+				10,
+				2,
+				new AtomicReference<Throwable>(),
+				new LinkedList<KinesisStreamShardState>(),
+				subscribedStreamsToLastSeenShardIdsUnderTest,
+				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
+
+		fetcher.setIsRestoringFromFailure(false);
+
+		PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
+		Thread runFetcherThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetcher();
+				} catch (Exception e) {
+					//
+				}
+			}
+		});
+		runFetcherThread.start();
+		Thread.sleep(1000); // sleep a while before closing
+		fetcher.shutdownFetcher();
+
+
+		// assert that the streams tracked in the state are identical to the subscribed streams
+		Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
+		assertTrue(streamsInState.size() == fakeStreams.size());
+		assertTrue(streamsInState.containsAll(fakeStreams));
+
+		// assert that the last seen shards in state is correctly set
+		for (Map.Entry<String,String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
+			assertTrue(
+				streamToLastSeenShard.getValue().equals(
+					KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1)));
+		}
+	}
+
+	@Test
+	public void testStreamToLastSeenShardStateIsCorrectlySetWhenNoNewShardsSinceRestoredCheckpoint() throws Exception {
+		List<String> fakeStreams = new LinkedList<>();
+		fakeStreams.add("fakeStream1");
+		fakeStreams.add("fakeStream2");
+
+		Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+
+		// fakeStream1 has 3 shards before restore
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			UUID.randomUUID().toString());
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			UUID.randomUUID().toString());
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+			UUID.randomUUID().toString());
+
+		// fakeStream2 has 2 shards before restore
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream2",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			UUID.randomUUID().toString());
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream2",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			UUID.randomUUID().toString());
+
+		Map<String,Integer> streamToShardCount = new HashMap<>();
+		streamToShardCount.put("fakeStream1", 3); // fakeStream1 will still have 3 shards after restore
+		streamToShardCount.put("fakeStream2", 2); // fakeStream2 will still have 2 shards after restore
+
+		HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
+			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
+
+		final TestableKinesisDataFetcher fetcher =
+			new TestableKinesisDataFetcher(
+				fakeStreams,
+				new Properties(),
+				10,
+				2,
+				new AtomicReference<Throwable>(),
+				new LinkedList<KinesisStreamShardState>(),
+				subscribedStreamsToLastSeenShardIdsUnderTest,
+				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
+
+		for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) {
+			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
+			fetcher.registerNewSubscribedShardState(
+				new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+		}
+
+		fetcher.setIsRestoringFromFailure(true);
+
+		PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
+		Thread runFetcherThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetcher();
+				} catch (Exception e) {
+					//
+				}
+			}
+		});
+		runFetcherThread.start();
+		Thread.sleep(1000); // sleep a while before closing
+		fetcher.shutdownFetcher();
+
+		// assert that the streams tracked in the state are identical to the subscribed streams
+		Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
+		assertTrue(streamsInState.size() == fakeStreams.size());
+		assertTrue(streamsInState.containsAll(fakeStreams));
+
+		// assert that the last seen shards in state is correctly set
+		for (Map.Entry<String,String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
+			assertTrue(
+				streamToLastSeenShard.getValue().equals(
+					KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1)));
+		}
+	}
+
+	@Test
+	public void testStreamToLastSeenShardStateIsCorrectlySetWhenNewShardsFoundSinceRestoredCheckpoint() throws Exception {
+		List<String> fakeStreams = new LinkedList<>();
+		fakeStreams.add("fakeStream1");
+		fakeStreams.add("fakeStream2");
+
+		Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+
+		// fakeStream1 has 3 shards before restore
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			UUID.randomUUID().toString());
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			UUID.randomUUID().toString());
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+			UUID.randomUUID().toString());
+
+		// fakeStream2 has 2 shards before restore
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream2",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			UUID.randomUUID().toString());
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream2",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			UUID.randomUUID().toString());
+
+		Map<String,Integer> streamToShardCount = new HashMap<>();
+		streamToShardCount.put("fakeStream1", 3+1); // fakeStream1 had 3 shards before & 1 new shard after restore
+		streamToShardCount.put("fakeStream2", 2+3); // fakeStream2 had 2 shards before & 3 new shard after restore
+
+		HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
+			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
+
+		// using a non-resharded streams kinesis behaviour to represent that Kinesis is not resharded AFTER the restore
+		final TestableKinesisDataFetcher fetcher =
+			new TestableKinesisDataFetcher(
+				fakeStreams,
+				new Properties(),
+				10,
+				2,
+				new AtomicReference<Throwable>(),
+				new LinkedList<KinesisStreamShardState>(),
+				subscribedStreamsToLastSeenShardIdsUnderTest,
+				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
+
+		for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) {
+			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
+			fetcher.registerNewSubscribedShardState(
+				new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+		}
+
+		fetcher.setIsRestoringFromFailure(true);
+
+		PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
+		Thread runFetcherThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetcher();
+				} catch (Exception e) {
+					//
+				}
+			}
+		});
+		runFetcherThread.start();
+		Thread.sleep(1000); // sleep a while before closing
+		fetcher.shutdownFetcher();
+
+		// assert that the streams tracked in the state are identical to the subscribed streams
+		Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
+		assertTrue(streamsInState.size() == fakeStreams.size());
+		assertTrue(streamsInState.containsAll(fakeStreams));
+
+		// assert that the last seen shards in state is correctly set
+		for (Map.Entry<String,String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
+			assertTrue(
+				streamToLastSeenShard.getValue().equals(
+					KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1)));
+		}
+	}
+
+	@Test
+	public void testStreamToLastSeenShardStateIsCorrectlySetWhenNoNewShardsSinceRestoredCheckpointAndSomeStreamsDoNotExist() throws Exception {
+		List<String> fakeStreams = new LinkedList<>();
+		fakeStreams.add("fakeStream1");
+		fakeStreams.add("fakeStream2");
+		fakeStreams.add("fakeStream3"); // fakeStream3 will not have any shards
+		fakeStreams.add("fakeStream4"); // fakeStream4 will not have any shards
+
+		Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+
+		// fakeStream1 has 3 shards before restore
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			UUID.randomUUID().toString());
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			UUID.randomUUID().toString());
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+			UUID.randomUUID().toString());
+
+		// fakeStream2 has 2 shards before restore
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream2",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			UUID.randomUUID().toString());
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream2",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			UUID.randomUUID().toString());
+
+		Map<String,Integer> streamToShardCount = new HashMap<>();
+		streamToShardCount.put("fakeStream1", 3); // fakeStream1 has fixed 3 shards
+		streamToShardCount.put("fakeStream2", 2); // fakeStream2 has fixed 2 shards
+		streamToShardCount.put("fakeStream3", 0); // no shards can be found for fakeStream3
+		streamToShardCount.put("fakeStream4", 0); // no shards can be found for fakeStream4
+
+		HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
+			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
+
+		// using a non-resharded streams kinesis behaviour to represent that Kinesis is not resharded AFTER the restore
+		final TestableKinesisDataFetcher fetcher =
+			new TestableKinesisDataFetcher(
+				fakeStreams,
+				new Properties(),
+				10,
+				2,
+				new AtomicReference<Throwable>(),
+				new LinkedList<KinesisStreamShardState>(),
+				subscribedStreamsToLastSeenShardIdsUnderTest,
+				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
+
+		for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) {
+			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
+			fetcher.registerNewSubscribedShardState(
+				new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+		}
+
+		fetcher.setIsRestoringFromFailure(true);
+
+		PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
+		Thread runFetcherThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetcher();
+				} catch (Exception e) {
+					//
+				}
+			}
+		});
+		runFetcherThread.start();
+		Thread.sleep(1000); // sleep a while before closing
+		fetcher.shutdownFetcher();
+
+		// assert that the streams tracked in the state are identical to the subscribed streams
+		Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
+		assertTrue(streamsInState.size() == fakeStreams.size());
+		assertTrue(streamsInState.containsAll(fakeStreams));
+
+		// assert that the last seen shards in state is correctly set
+		assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream1").equals(
+			KinesisShardIdGenerator.generateFromShardOrder(2)));
+		assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream2").equals(
+			KinesisShardIdGenerator.generateFromShardOrder(1)));
+		assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream3") == null);
+		assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream4") == null);
+	}
+
+	@Test
+	public void testStreamToLastSeenShardStateIsCorrectlySetWhenNewShardsFoundSinceRestoredCheckpointAndSomeStreamsDoNotExist() throws Exception {
+		List<String> fakeStreams = new LinkedList<>();
+		fakeStreams.add("fakeStream1");
+		fakeStreams.add("fakeStream2");
+		fakeStreams.add("fakeStream3"); // fakeStream3 will not have any shards
+		fakeStreams.add("fakeStream4"); // fakeStream4 will not have any shards
+
+		Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+
+		// fakeStream1 has 3 shards before restore
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			UUID.randomUUID().toString());
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			UUID.randomUUID().toString());
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream1",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
+			UUID.randomUUID().toString());
+
+		// fakeStream2 has 2 shards before restore
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream2",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
+			UUID.randomUUID().toString());
+		restoredStateUnderTest.put(
+			new KinesisStreamShard(
+				"fakeStream2",
+				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
+			UUID.randomUUID().toString());
+
+		Map<String,Integer> streamToShardCount = new HashMap<>();
+		streamToShardCount.put("fakeStream1", 3+1); // fakeStream1 had 3 shards before & 1 new shard after restore
+		streamToShardCount.put("fakeStream2", 2+3); // fakeStream2 had 2 shards before & 2 new shard after restore
+		streamToShardCount.put("fakeStream3", 0); // no shards can be found for fakeStream3
+		streamToShardCount.put("fakeStream4", 0); // no shards can be found for fakeStream4
+
+		HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
+			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
+
+		// using a non-resharded streams kinesis behaviour to represent that Kinesis is not resharded AFTER the restore
+		final TestableKinesisDataFetcher fetcher =
+			new TestableKinesisDataFetcher(
+				fakeStreams,
+				new Properties(),
+				10,
+				2,
+				new AtomicReference<Throwable>(),
+				new LinkedList<KinesisStreamShardState>(),
+				subscribedStreamsToLastSeenShardIdsUnderTest,
+				FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
+
+		for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) {
+			fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
+			fetcher.registerNewSubscribedShardState(
+				new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+		}
+
+		fetcher.setIsRestoringFromFailure(true);
+
+		PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
+		Thread runFetcherThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					fetcher.runFetcher();
+				} catch (Exception e) {
+					//
+				}
+			}
+		});
+		runFetcherThread.start();
+		Thread.sleep(1000); // sleep a while before closing
+		fetcher.shutdownFetcher();
+
+		// assert that the streams tracked in the state are identical to the subscribed streams
+		Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
+		assertTrue(streamsInState.size() == fakeStreams.size());
+		assertTrue(streamsInState.containsAll(fakeStreams));
+
+		// assert that the last seen shards in state is correctly set
+		assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream1").equals(
+			KinesisShardIdGenerator.generateFromShardOrder(3)));
+		assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream2").equals(
+			KinesisShardIdGenerator.generateFromShardOrder(4)));
+		assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream3") == null);
+		assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream4") == null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
new file mode 100644
index 0000000..96764a4
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertTrue;
+
+public class ShardConsumerTest {
+
+	@Test
+	public void testCorrectNumOfCollectedRecordsAndUpdatedState() {
+		KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard(
+			"fakeStream",
+			new Shard()
+				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
+				.withHashKeyRange(
+					new HashKeyRange()
+						.withStartingHashKey("0")
+						.withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString())));
+
+		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
+		subscribedShardsStateUnderTest.add(
+			new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+
+		TestableKinesisDataFetcher fetcher =
+			new TestableKinesisDataFetcher(
+				Collections.singletonList("fakeStream"),
+				new Properties(),
+				10,
+				2,
+				new AtomicReference<Throwable>(),
+				subscribedShardsStateUnderTest,
+				KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
+				Mockito.mock(KinesisProxyInterface.class));
+
+		new ShardConsumer<>(
+			fetcher,
+			0,
+			subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(),
+			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
+			FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9)).run();
+
+		assertTrue(fetcher.getNumOfElementsCollected() == 1000);
+		assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals(
+			SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()));
+	}
+
+	@Test
+	public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
+		KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard(
+			"fakeStream",
+			new Shard()
+				.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
+				.withHashKeyRange(
+					new HashKeyRange()
+						.withStartingHashKey("0")
+						.withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString())));
+
+		LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
+		subscribedShardsStateUnderTest.add(
+			new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+
+		TestableKinesisDataFetcher fetcher =
+			new TestableKinesisDataFetcher(
+				Collections.singletonList("fakeStream"),
+				new Properties(),
+				10,
+				2,
+				new AtomicReference<Throwable>(),
+				subscribedShardsStateUnderTest,
+				KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
+				Mockito.mock(KinesisProxyInterface.class));
+
+		new ShardConsumer<>(
+			fetcher,
+			0,
+			subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(),
+			subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
+			// Get a total of 1000 records with 9 getRecords() calls,
+			// and the 7th getRecords() call will encounter an unexpected expired shard iterator
+			FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000, 9, 7)).run();
+
+		assertTrue(fetcher.getNumOfElementsCollected() == 1000);
+		assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals(
+			SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
new file mode 100644
index 0000000..6e02a55
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.manualtests;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.util.Collector;
+
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+/**
+ * This is a manual test for the AWS Kinesis connector in Flink.
+ *
+ * It uses:
+ *  - A custom KinesisSerializationSchema
+ *  - A custom KinesisPartitioner
+ *
+ * Invocation:
+ * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ */
+public class ManualConsumerProducerTest {
+
+	public static void main(String[] args) throws Exception {
+		ParameterTool pt = ParameterTool.fromArgs(args);
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(4);
+
+		DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator());
+
+		Properties kinesisProducerConfig = new Properties();
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
+
+		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
+				new KinesisSerializationSchema<String>() {
+					@Override
+					public ByteBuffer serialize(String element) {
+						return ByteBuffer.wrap(element.getBytes());
+					}
+
+					// every 10th element goes into a different stream
+					@Override
+					public String getTargetStream(String element) {
+						if(element.split("-")[0].endsWith("0")) {
+							return "flink-test-2";
+						}
+						return null; // send to default stream
+					}
+				},
+				kinesisProducerConfig
+		);
+
+		kinesis.setFailOnError(true);
+		kinesis.setDefaultStream("test-flink");
+		kinesis.setDefaultPartition("0");
+		kinesis.setCustomPartitioner(new KinesisPartitioner<String>() {
+			@Override
+			public String getPartitionId(String element) {
+				int l = element.length();
+				return element.substring(l - 1, l);
+			}
+		});
+		simpleStringStream.addSink(kinesis);
+
+
+		// consuming topology
+		Properties consumerProps = new Properties();
+		consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+		consumerProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
+		consumerProps.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region"));
+		DataStream<String> consuming = see.addSource(new FlinkKinesisConsumer<>("test-flink", new SimpleStringSchema(), consumerProps));
+		// validate consumed records for correctness
+		consuming.flatMap(new FlatMapFunction<String, String>() {
+			@Override
+			public void flatMap(String value, Collector<String> out) throws Exception {
+				String[] parts = value.split("-");
+				try {
+					long l = Long.parseLong(parts[0]);
+					if(l < 0) {
+						throw new RuntimeException("Negative");
+					}
+				} catch(NumberFormatException nfe) {
+					throw new RuntimeException("First part of '" + value + "' is not a valid numeric type");
+				}
+				if(parts[1].length() != 12) {
+					throw new RuntimeException("Second part of '" + value + "' doesn't have 12 characters");
+				}
+			}
+		});
+		consuming.print();
+
+		see.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
new file mode 100644
index 0000000..2e452c1
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.manualtests;
+
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This test first starts a data generator, producing data into kinesis.
+ * Then, it starts a consuming topology, ensuring that all records up to a certain
+ * point have been seen.
+ *
+ * Invocation:
+ * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ */
+public class ManualExactlyOnceTest {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ManualExactlyOnceTest.class);
+
+	static final int TOTAL_EVENT_COUNT = 1000; // the producer writes one per 10 ms, so it runs for 10k ms = 10 seconds
+
+	public static void main(String[] args) throws Exception {
+		final ParameterTool pt = ParameterTool.fromArgs(args);
+		LOG.info("Starting exactly once test");
+
+		final String streamName = "flink-test-" + UUID.randomUUID().toString();
+		final String accessKey = pt.getRequired("accessKey");
+		final String secretKey = pt.getRequired("secretKey");
+		final String region = pt.getRequired("region");
+
+		Properties configProps = new Properties();
+		configProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, accessKey);
+		configProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
+		configProps.setProperty(AWSConfigConstants.AWS_REGION, region);
+		AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps);
+
+		// create a stream for the test:
+		client.createStream(streamName, 1);
+
+		// wait until stream has been created
+		DescribeStreamResult status = client.describeStream(streamName);
+		LOG.info("status {}" ,status);
+		while(!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
+			status = client.describeStream(streamName);
+			LOG.info("Status of stream {}", status);
+			Thread.sleep(1000);
+		}
+
+		final Configuration flinkConfig = new Configuration();
+		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
+
+		LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
+		flink.start();
+
+		final int flinkPort = flink.getLeaderRPCPort();
+
+		try {
+			final AtomicReference<Throwable> producerError = new AtomicReference<>();
+			Thread producerThread = KinesisEventsGeneratorProducerThread.create(
+				TOTAL_EVENT_COUNT, 2,
+				accessKey, secretKey, region, streamName,
+				producerError, flinkPort, flinkConfig);
+			producerThread.start();
+
+			final AtomicReference<Throwable> consumerError = new AtomicReference<>();
+			Thread consumerThread = ExactlyOnceValidatingConsumerThread.create(
+				TOTAL_EVENT_COUNT, 200, 2, 500, 500,
+				accessKey, secretKey, region, streamName,
+				consumerError, flinkPort, flinkConfig);
+			consumerThread.start();
+
+			boolean deadlinePassed = false;
+			long deadline = System.currentTimeMillis() + (1000 * 2 * 60); // wait at most for two minutes
+			// wait until both producer and consumer finishes, or an unexpected error is thrown
+			while ((consumerThread.isAlive() || producerThread.isAlive()) &&
+				(producerError.get() == null && consumerError.get() == null)) {
+				Thread.sleep(1000);
+				if (System.currentTimeMillis() >= deadline) {
+					LOG.warn("Deadline passed");
+					deadlinePassed = true;
+					break; // enough waiting
+				}
+			}
+
+			if (producerThread.isAlive()) {
+				producerThread.interrupt();
+			}
+
+			if (consumerThread.isAlive()) {
+				consumerThread.interrupt();
+			}
+
+			if (producerError.get() != null) {
+				LOG.info("+++ TEST failed! +++");
+				throw new RuntimeException("Producer failed", producerError.get());
+			}
+			if (consumerError.get() != null) {
+				LOG.info("+++ TEST failed! +++");
+				throw new RuntimeException("Consumer failed", consumerError.get());
+			}
+
+			if (!deadlinePassed) {
+				LOG.info("+++ TEST passed! +++");
+			} else {
+				LOG.info("+++ TEST failed! +++");
+			}
+
+		} finally {
+			client.deleteStream(streamName);
+			client.shutdown();
+
+			// stopping flink
+			flink.stop();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
new file mode 100644
index 0000000..6abea2a
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.manualtests;
+
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
+import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This test first starts a data generator, producing data into kinesis.
+ * Then, it starts a consuming topology, ensuring that all records up to a certain
+ * point have been seen. While the data generator and consuming topology is running,
+ * the kinesis stream is resharded two times.
+ *
+ * Invocation:
+ * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ */
+public class ManualExactlyOnceWithStreamReshardingTest {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ManualExactlyOnceWithStreamReshardingTest.class);
+
+	static final int TOTAL_EVENT_COUNT = 20000; // a large enough record count so we can test resharding
+
+	public static void main(String[] args) throws Exception {
+		final ParameterTool pt = ParameterTool.fromArgs(args);
+		LOG.info("Starting exactly once with stream resharding test");
+
+		final String streamName = "flink-test-" + UUID.randomUUID().toString();
+		final String accessKey = pt.getRequired("accessKey");
+		final String secretKey = pt.getRequired("secretKey");
+		final String region = pt.getRequired("region");
+
+		final Properties configProps = new Properties();
+		configProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, accessKey);
+		configProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
+		configProps.setProperty(ConsumerConfigConstants.AWS_REGION, region);
+		configProps.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "0");
+		final AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps);
+
+		// the stream is first created with 1 shard
+		client.createStream(streamName, 1);
+
+		// wait until stream has been created
+		DescribeStreamResult status = client.describeStream(streamName);
+		LOG.info("status {}", status);
+		while(!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
+			status = client.describeStream(streamName);
+			LOG.info("Status of stream {}", status);
+			Thread.sleep(1000);
+		}
+
+		final Configuration flinkConfig = new Configuration();
+		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
+		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
+
+		LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);
+		flink.start();
+
+		final int flinkPort = flink.getLeaderRPCPort();
+
+		try {
+			// we have to use a manual generator here instead of the FlinkKinesisProducer
+			// because the FlinkKinesisProducer currently has a problem where records will be resent to a shard
+			// when resharding happens; this affects the consumer exactly-once validation test and will never pass
+			final AtomicReference<Throwable> producerError = new AtomicReference<>();
+			Runnable manualGenerate = new Runnable() {
+				@Override
+				public void run() {
+					AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps);
+					int count = 0;
+					final int batchSize = 30;
+					while (true) {
+						try {
+							Thread.sleep(10);
+
+							Set<PutRecordsRequestEntry> batch = new HashSet<>();
+							for (int i=count; i<count+batchSize; i++) {
+								if (i >= TOTAL_EVENT_COUNT) {
+									break;
+								}
+								batch.add(
+									new PutRecordsRequestEntry()
+										.withData(ByteBuffer.wrap(((i) + "-" + RandomStringUtils.randomAlphabetic(12)).getBytes()))
+										.withPartitionKey(UUID.randomUUID().toString()));
+							}
+							count += batchSize;
+
+							PutRecordsResult result = client.putRecords(new PutRecordsRequest().withStreamName(streamName).withRecords(batch));
+
+							// the putRecords() operation may have failing records; to keep this test simple
+							// instead of retrying on failed records, we simply pass on a runtime exception
+							// and let this test fail
+							if (result.getFailedRecordCount() > 0) {
+								producerError.set(new RuntimeException("The producer has failed records in one of the put batch attempts."));
+								break;
+							}
+
+							if (count >= TOTAL_EVENT_COUNT) {
+								break;
+							}
+						} catch (Exception e) {
+							producerError.set(e);
+						}
+					}
+				}
+			};
+			Thread producerThread = new Thread(manualGenerate);
+			producerThread.start();
+
+			final AtomicReference<Throwable> consumerError = new AtomicReference<>();
+			Thread consumerThread = ExactlyOnceValidatingConsumerThread.create(
+				TOTAL_EVENT_COUNT, 10000, 2, 500, 500,
+				accessKey, secretKey, region, streamName,
+				consumerError, flinkPort, flinkConfig);
+			consumerThread.start();
+
+			// reshard the Kinesis stream while the producer / and consumers are running
+			Runnable splitShard = new Runnable() {
+				@Override
+				public void run() {
+					try {
+						// first, split shard in the middle of the hash range
+						Thread.sleep(5000);
+						LOG.info("Splitting shard ...");
+						client.splitShard(
+							streamName,
+							KinesisShardIdGenerator.generateFromShardOrder(0),
+							"170141183460469231731687303715884105727");
+
+						// wait until the split shard operation finishes updating ...
+						DescribeStreamResult status;
+						Random rand = new Random();
+						do {
+							status = null;
+							while (status == null) {
+								// retry until we get status
+								try {
+									status = client.describeStream(streamName);
+								} catch (LimitExceededException lee) {
+									LOG.warn("LimitExceededException while describing stream ... retrying ...");
+									Thread.sleep(rand.nextInt(1200));
+								}
+							}
+						} while (!status.getStreamDescription().getStreamStatus().equals("ACTIVE"));
+
+						// then merge again
+						Thread.sleep(7000);
+						LOG.info("Merging shards ...");
+						client.mergeShards(
+							streamName,
+							KinesisShardIdGenerator.generateFromShardOrder(1),
+							KinesisShardIdGenerator.generateFromShardOrder(2));
+					} catch (InterruptedException iex) {
+						//
+					}
+				}
+			};
+			Thread splitShardThread = new Thread(splitShard);
+			splitShardThread.start();
+
+			boolean deadlinePassed = false;
+			long deadline = System.currentTimeMillis() + (1000 * 5 * 60); // wait at most for five minutes
+			// wait until both producer and consumer finishes, or an unexpected error is thrown
+			while ((consumerThread.isAlive() || producerThread.isAlive()) &&
+				(producerError.get() == null && consumerError.get() == null)) {
+				Thread.sleep(1000);
+				if (System.currentTimeMillis() >= deadline) {
+					LOG.warn("Deadline passed");
+					deadlinePassed = true;
+					break; // enough waiting
+				}
+			}
+
+			if (producerThread.isAlive()) {
+				producerThread.interrupt();
+			}
+
+			if (consumerThread.isAlive()) {
+				consumerThread.interrupt();
+			}
+
+			if (producerError.get() != null) {
+				LOG.info("+++ TEST failed! +++");
+				throw new RuntimeException("Producer failed", producerError.get());
+
+			}
+
+			if (consumerError.get() != null) {
+				LOG.info("+++ TEST failed! +++");
+				throw new RuntimeException("Consumer failed", consumerError.get());
+			}
+
+			if (!deadlinePassed) {
+				LOG.info("+++ TEST passed! +++");
+			} else {
+				LOG.info("+++ TEST failed! +++");
+			}
+
+		} finally {
+			client.deleteStream(streamName);
+			client.shutdown();
+
+			// stopping flink
+			flink.stop();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
new file mode 100644
index 0000000..35e9ef6
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.manualtests;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.KinesisPartitioner;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.examples.ProduceIntoKinesis;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+/**
+ * This is a manual test for the AWS Kinesis connector in Flink.
+ *
+ * It uses:
+ *  - A custom KinesisSerializationSchema
+ *  - A custom KinesisPartitioner
+ *
+ *  The streams "test-flink" and "flink-test-2" must exist.
+ *
+ * Invocation:
+ * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ */
+public class ManualProducerTest {
+
+	public static void main(String[] args) throws Exception {
+		ParameterTool pt = ParameterTool.fromArgs(args);
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(4);
+
+		DataStream<String> simpleStringStream = see.addSource(new ProduceIntoKinesis.EventsGenerator());
+
+		Properties kinesisProducerConfig = new Properties();
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
+
+		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
+				new KinesisSerializationSchema<String>() {
+					@Override
+					public ByteBuffer serialize(String element) {
+						return ByteBuffer.wrap(element.getBytes());
+					}
+
+					// every 10th element goes into a different stream
+					@Override
+					public String getTargetStream(String element) {
+						if(element.split("-")[0].endsWith("0")) {
+							return "flink-test-2";
+						}
+						return null; // send to default stream
+					}
+				},
+				kinesisProducerConfig
+		);
+
+		kinesis.setFailOnError(true);
+		kinesis.setDefaultStream("test-flink");
+		kinesis.setDefaultPartition("0");
+		kinesis.setCustomPartitioner(new KinesisPartitioner<String>() {
+			@Override
+			public String getPartitionId(String element) {
+				int l = element.length();
+				return element.substring(l - 1, l);
+			}
+		});
+		simpleStringStream.addSink(kinesis);
+
+		see.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
new file mode 100644
index 0000000..157964c
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.testutils;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+/**
+ * A thread that runs a topology with the FlinkKinesisConsumer as source, followed by two flat map
+ * functions, one that performs artificial failures and another that validates exactly-once guarantee
+ */
+public class ExactlyOnceValidatingConsumerThread {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceValidatingConsumerThread.class);
+
+	public static Thread create(final int totalEventCount,
+								final int failAtRecordCount,
+								final int parallelism,
+								final int checkpointInterval,
+								final long restartDelay,
+								final String awsAccessKey,
+								final String awsSecretKey,
+								final String awsRegion,
+								final String kinesisStreamName,
+								final AtomicReference<Throwable> errorHandler,
+								final int flinkPort,
+								final Configuration flinkConfig) {
+		Runnable exactlyOnceValidationConsumer = new Runnable() {
+			@Override
+			public void run() {
+				try {
+					StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort, flinkConfig);
+					see.setParallelism(parallelism);
+					see.enableCheckpointing(checkpointInterval);
+					// we restart two times
+					see.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, restartDelay));
+
+					// consuming topology
+					Properties consumerProps = new Properties();
+					consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey);
+					consumerProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretKey);
+					consumerProps.setProperty(ConsumerConfigConstants.AWS_REGION, awsRegion);
+					// start reading from beginning
+					consumerProps.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name());
+					DataStream<String> consuming = see.addSource(new FlinkKinesisConsumer<>(kinesisStreamName, new SimpleStringSchema(), consumerProps));
+					consuming
+						.flatMap(new ArtificialFailOnceFlatMapper(failAtRecordCount))
+						// validate consumed records for correctness (use only 1 instance to validate all consumed records)
+						.flatMap(new ExactlyOnceValidatingMapper(totalEventCount)).setParallelism(1);
+
+					LOG.info("Starting consuming topology");
+					tryExecute(see, "Consuming topo");
+					LOG.info("Consuming topo finished");
+				} catch (Exception e) {
+					LOG.warn("Error while running consuming topology", e);
+					errorHandler.set(e);
+				}
+			}
+		};
+
+		return new Thread(exactlyOnceValidationConsumer);
+	}
+
+	private static class ExactlyOnceValidatingMapper implements FlatMapFunction<String,String>, Checkpointed<BitSet> {
+
+		private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceValidatingMapper.class);
+
+		private final int totalEventCount;
+		private BitSet validator;
+
+		public ExactlyOnceValidatingMapper(int totalEventCount) {
+			this.totalEventCount = totalEventCount;
+			this.validator = new BitSet(totalEventCount);
+		}
+
+		@Override
+		public void flatMap(String value, Collector<String> out) throws Exception {
+			LOG.info("Consumed {}", value);
+
+			int id = Integer.parseInt(value.split("-")[0]);
+			if(validator.get(id)) {
+				throw new RuntimeException("Saw id " + id +" twice!");
+			}
+			validator.set(id);
+			if(id > totalEventCount-1) {
+				throw new RuntimeException("Out of bounds ID observed");
+			}
+
+			if(validator.nextClearBit(0) == totalEventCount) {
+				throw new SuccessException();
+			}
+		}
+
+		@Override
+		public BitSet snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return validator;
+		}
+
+		@Override
+		public void restoreState(BitSet state) throws Exception {
+			this.validator = state;
+		}
+	}
+
+	private static class ArtificialFailOnceFlatMapper extends RichFlatMapFunction<String,String> {
+		int count = 0;
+
+		private final int failAtRecordCount;
+
+		public ArtificialFailOnceFlatMapper(int failAtRecordCount) {
+			this.failAtRecordCount = failAtRecordCount;
+		}
+
+		@Override
+		public void flatMap(String value, Collector<String> out) throws Exception {
+			if (count++ >= failAtRecordCount && getRuntimeContext().getAttemptNumber() == 0) {
+				throw new RuntimeException("Artificial failure. Restart please.");
+			}
+			out.collect(value);
+		}
+	}
+}