You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:18 UTC

[28/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
new file mode 100644
index 0000000..9e0c7e8
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.2-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-kinesis_2.10</artifactId>
+	<name>flink-connector-kinesis</name>
+	<properties>
+		<aws.sdk.version>1.10.71</aws.sdk.version>
+		<aws.kinesis-kcl.version>1.6.2</aws.kinesis-kcl.version>
+		<aws.kinesis-kpl.version>0.10.2</aws.kinesis-kpl.version>
+	</properties>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- Note:
+			The below dependencies are licenced under the Amazon Software License.
+			Flink includes the "flink-connector-kinesis" only as an optional dependency for that reason.
+		-->
+		<dependency>
+			<groupId>com.amazonaws</groupId>
+			<artifactId>aws-java-sdk-kinesis</artifactId>
+			<version>${aws.sdk.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.amazonaws</groupId>
+			<artifactId>amazon-kinesis-producer</artifactId>
+			<version>${aws.kinesis-kpl.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.amazonaws</groupId>
+			<artifactId>amazon-kinesis-client</artifactId>
+			<version>${aws.kinesis-kcl.version}</version>
+			<!--
+				We're excluding the below from the KCL since we'll only be using the
+				com.amazonaws.services.kinesis.clientlibrary.types.UserRecord class, which will not need these dependencies.
+			-->
+			<exclusions>
+				<exclusion>
+					<groupId>com.amazonaws</groupId>
+					<artifactId>aws-java-sdk-dynamodb</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.amazonaws</groupId>
+					<artifactId>aws-java-sdk-cloudwatch</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+							<artifactSet combine.children="append">
+								<includes>
+									<include>com.amazonaws:*</include>
+									<include>com.google.protobuf:*</include>
+								</includes>
+							</artifactSet>
+							<relocations combine.children="override">
+								<!-- DO NOT RELOCATE GUAVA IN THIS PACKAGE -->
+								<relocation>
+									<pattern>org.objectweb.asm</pattern>
+									<shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>com.google.protobuf</pattern>
+									<shadedPattern>org.apache.flink.kinesis.shaded.com.google.protobuf</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>com.amazonaws</pattern>
+									<shadedPattern>org.apache.flink.kinesis.shaded.com.amazonaws</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
new file mode 100644
index 0000000..a62dc10
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis
+ * streams within the same AWS service region, and can handle resharding of streams. Each subtask of the consumer is
+ * responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will
+ * change as shards are closed and created by Kinesis.
+ *
+ * <p>To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST.</p>
+ *
+ * @param <T> the type of data emitted
+ */
+public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
+	implements CheckpointedAsynchronously<HashMap<KinesisStreamShard, SequenceNumber>>, ResultTypeQueryable<T> {
+
+	private static final long serialVersionUID = 4724006128720664870L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+	// ------------------------------------------------------------------------
+	//  Consumer properties
+	// ------------------------------------------------------------------------
+
+	/** The names of the Kinesis streams that we will be consuming from */
+	private final List<String> streams;
+
+	/** Properties to parametrize settings such as AWS service region, initial position in stream,
+	 * shard list retrieval behaviours, etc */
+	private final Properties configProps;
+
+	/** User supplied deseriliazation schema to convert Kinesis byte messages to Flink objects */
+	private final KinesisDeserializationSchema<T> deserializer;
+
+	// ------------------------------------------------------------------------
+	//  Runtime state
+	// ------------------------------------------------------------------------
+
+	/** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards */
+	private transient KinesisDataFetcher<T> fetcher;
+
+	/** The sequence numbers in the last state snapshot of this subtask */
+	private transient HashMap<KinesisStreamShard, SequenceNumber> lastStateSnapshot;
+
+	/** The sequence numbers to restore to upon restore from failure */
+	private transient HashMap<KinesisStreamShard, SequenceNumber> sequenceNumsToRestore;
+
+	private volatile boolean running = true;
+
+
+	// ------------------------------------------------------------------------
+	//  Constructors
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new Flink Kinesis Consumer.
+	 *
+	 * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming
+	 * from are configured with a {@link Properties} instance.</p>
+	 *
+	 * @param stream
+	 *           The single AWS Kinesis stream to read from.
+	 * @param deserializer
+	 *           The deserializer used to convert raw bytes of Kinesis records to Java objects (without key).
+	 * @param configProps
+	 *           The properties used to configure AWS credentials, AWS region, and initial starting position.
+	 */
+	public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps) {
+		this(stream, new KinesisDeserializationSchemaWrapper<>(deserializer), configProps);
+	}
+
+	/**
+	 * Creates a new Flink Kinesis Consumer.
+	 *
+	 * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming
+	 * from are configured with a {@link Properties} instance.</p>
+	 *
+	 * @param stream
+	 *           The single AWS Kinesis stream to read from.
+	 * @param deserializer
+	 *           The keyed deserializer used to convert raw bytes of Kinesis records to Java objects.
+	 * @param configProps
+	 *           The properties used to configure AWS credentials, AWS region, and initial starting position.
+	 */
+	public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps) {
+		this(Collections.singletonList(stream), deserializer, configProps);
+	}
+
+	/**
+	 * Creates a new Flink Kinesis Consumer.
+	 *
+	 * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming
+	 * from are configured with a {@link Properties} instance.</p>
+	 *
+	 * @param streams
+	 *           The AWS Kinesis streams to read from.
+	 * @param deserializer
+	 *           The keyed deserializer used to convert raw bytes of Kinesis records to Java objects.
+	 * @param configProps
+	 *           The properties used to configure AWS credentials, AWS region, and initial starting position.
+	 */
+	public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps) {
+		checkNotNull(streams, "streams can not be null");
+		checkArgument(streams.size() != 0, "must be consuming at least 1 stream");
+		checkArgument(!streams.contains(""), "stream names cannot be empty Strings");
+		this.streams = streams;
+
+		this.configProps = checkNotNull(configProps, "configProps can not be null");
+
+		// check the configuration properties for any conflicting settings
+		KinesisConfigUtil.validateConsumerConfiguration(this.configProps);
+
+		this.deserializer = checkNotNull(deserializer, "deserializer can not be null");
+
+		if (LOG.isInfoEnabled()) {
+			StringBuilder sb = new StringBuilder();
+			for (String stream : streams) {
+				sb.append(stream).append(", ");
+			}
+			LOG.info("Flink Kinesis Consumer is going to read the following streams: {}", sb.toString());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Source life cycle
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+
+		// restore to the last known sequence numbers from the latest complete snapshot
+		if (sequenceNumsToRestore != null) {
+			if (LOG.isInfoEnabled()) {
+				LOG.info("Subtask {} is restoring sequence numbers {} from previous checkpointed state",
+					getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore.toString());
+			}
+
+			// initialize sequence numbers with restored state
+			lastStateSnapshot = sequenceNumsToRestore;
+		} else {
+			// start fresh with empty sequence numbers if there are no snapshots to restore from.
+			lastStateSnapshot = new HashMap<>();
+		}
+	}
+
+	@Override
+	public void run(SourceContext<T> sourceContext) throws Exception {
+
+		// all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
+		// shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
+		// can potentially have new shards to subscribe to later on
+		fetcher = new KinesisDataFetcher<>(
+			streams, sourceContext, getRuntimeContext(), configProps, deserializer);
+
+		boolean isRestoringFromFailure = (sequenceNumsToRestore != null);
+		fetcher.setIsRestoringFromFailure(isRestoringFromFailure);
+
+		// if we are restoring from a checkpoint, we iterate over the restored
+		// state and accordingly seed the fetcher with subscribed shards states
+		if (isRestoringFromFailure) {
+			for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) {
+				fetcher.advanceLastDiscoveredShardOfStream(
+					restored.getKey().getStreamName(), restored.getKey().getShard().getShardId());
+
+				if (LOG.isInfoEnabled()) {
+					LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
+							" starting state set to the restored sequence number {}",
+						getRuntimeContext().getIndexOfThisSubtask(), restored.getKey().toString(), restored.getValue());
+				}
+				fetcher.registerNewSubscribedShardState(
+					new KinesisStreamShardState(restored.getKey(), restored.getValue()));
+			}
+		}
+
+		// check that we are running before starting the fetcher
+		if (!running) {
+			return;
+		}
+
+		// start the fetcher loop. The fetcher will stop running only when cancel() or
+		// close() is called, or an error is thrown by threads created by the fetcher
+		fetcher.runFetcher();
+
+		// check that the fetcher has terminated before fully closing
+		fetcher.awaitTermination();
+		sourceContext.close();
+	}
+
+	@Override
+	public void cancel() {
+		running = false;
+
+		KinesisDataFetcher fetcher = this.fetcher;
+		this.fetcher = null;
+
+		// this method might be called before the subtask actually starts running,
+		// so we must check if the fetcher is actually created
+		if (fetcher != null) {
+			try {
+				// interrupt the fetcher of any work
+				fetcher.shutdownFetcher();
+				fetcher.awaitTermination();
+			} catch (Exception e) {
+				LOG.warn("Error while closing Kinesis data fetcher", e);
+			}
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		cancel();
+		super.close();
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return deserializer.getProducedType();
+	}
+
+	// ------------------------------------------------------------------------
+	//  State Snapshot & Restore
+	// ------------------------------------------------------------------------
+
+	@Override
+	public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		if (lastStateSnapshot == null) {
+			LOG.debug("snapshotState() requested on not yet opened source; returning null.");
+			return null;
+		}
+
+		if (fetcher == null) {
+			LOG.debug("snapshotState() requested on not yet running source; returning null.");
+			return null;
+		}
+
+		if (!running) {
+			LOG.debug("snapshotState() called on closed source; returning null.");
+			return null;
+		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Snapshotting state ...");
+		}
+
+		lastStateSnapshot = fetcher.snapshotState();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}",
+				lastStateSnapshot.toString(), checkpointId, checkpointTimestamp);
+		}
+
+		return lastStateSnapshot;
+	}
+
+	@Override
+	public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception {
+		sequenceNumsToRestore = restoredState;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
new file mode 100644
index 0000000..579bd6b
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.PropertiesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis.
+ *
+ * @param <OUT> Data type to produce into Kinesis Streams
+ */
+public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class);
+
+	/** Properties to parametrize settings such as AWS service region, access key etc. */
+	private final Properties configProps;
+
+	/* Flag controlling the error behavior of the producer */
+	private boolean failOnError = false;
+
+	/* Name of the default stream to produce to. Can be overwritten by the serialization schema */
+	private String defaultStream;
+
+	/* Default partition id. Can be overwritten by the serialization schema */
+	private String defaultPartition;
+
+	/* Schema for turning the OUT type into a byte array. */
+	private final KinesisSerializationSchema<OUT> schema;
+
+	/* Optional custom partitioner */
+	private KinesisPartitioner<OUT> customPartitioner = null;
+
+
+	// --------------------------- Runtime fields ---------------------------
+
+
+	/* Our Kinesis instance for each parallel Flink sink */
+	private transient KinesisProducer producer;
+
+	/* Callback handling failures */
+	private transient FutureCallback<UserRecordResult> callback;
+
+	/* Field for async exception */
+	private transient volatile Throwable thrownException;
+
+
+	// --------------------------- Initialization and configuration  ---------------------------
+
+
+	/**
+	 * Create a new FlinkKinesisProducer.
+	 * This is a constructor supporting Flink's {@see SerializationSchema}.
+	 *
+	 * @param schema Serialization schema for the data type
+	 * @param configProps The properties used to configure AWS credentials and AWS region
+	 */
+	public FlinkKinesisProducer(final SerializationSchema<OUT> schema, Properties configProps) {
+
+		// create a simple wrapper for the serialization schema
+		this(new KinesisSerializationSchema<OUT>() {
+			@Override
+			public ByteBuffer serialize(OUT element) {
+				// wrap into ByteBuffer
+				return ByteBuffer.wrap(schema.serialize(element));
+			}
+			// use default stream and hash key
+			@Override
+			public String getTargetStream(OUT element) {
+				return null;
+			}
+		}, configProps);
+	}
+
+	/**
+	 * Create a new FlinkKinesisProducer.
+	 * This is a constructor supporting {@see KinesisSerializationSchema}.
+	 *
+	 * @param schema Kinesis serialization schema for the data type
+	 * @param configProps The properties used to configure AWS credentials and AWS region
+	 */
+	public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps) {
+		this.configProps = checkNotNull(configProps, "configProps can not be null");
+
+		// check the configuration properties for any conflicting settings
+		KinesisConfigUtil.validateProducerConfiguration(this.configProps);
+
+		ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema));
+		this.schema = schema;
+	}
+
+	/**
+	 * If set to true, the producer will immediately fail with an exception on any error.
+	 * Otherwise, the errors are logged and the producer goes on.
+	 *
+	 * @param failOnError Error behavior flag
+	 */
+	public void setFailOnError(boolean failOnError) {
+		this.failOnError = failOnError;
+	}
+
+	/**
+	 * Set a default stream name.
+	 * @param defaultStream Name of the default Kinesis stream
+	 */
+	public void setDefaultStream(String defaultStream) {
+		this.defaultStream = defaultStream;
+	}
+
+	/**
+	 * Set default partition id
+	 * @param defaultPartition Name of the default partition
+	 */
+	public void setDefaultPartition(String defaultPartition) {
+		this.defaultPartition = defaultPartition;
+	}
+
+	public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner) {
+		Objects.requireNonNull(partitioner);
+		ClosureCleaner.ensureSerializable(partitioner);
+		this.customPartitioner = partitioner;
+	}
+
+
+	// --------------------------- Lifecycle methods ---------------------------
+
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+
+		KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration();
+
+		producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
+		producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
+		if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
+			producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
+					ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG));
+		}
+		if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
+			producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
+					ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG));
+		}
+
+		producer = new KinesisProducer(producerConfig);
+		callback = new FutureCallback<UserRecordResult>() {
+			@Override
+			public void onSuccess(UserRecordResult result) {
+				if (!result.isSuccessful()) {
+					if(failOnError) {
+						thrownException = new RuntimeException("Record was not sent successful");
+					} else {
+						LOG.warn("Record was not sent successful");
+					}
+				}
+			}
+
+			@Override
+			public void onFailure(Throwable t) {
+				if (failOnError) {
+					thrownException = t;
+				} else {
+					LOG.warn("An exception occurred while processing a record", t);
+				}
+			}
+		};
+
+		if (this.customPartitioner != null) {
+			this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+		}
+
+		LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
+	}
+
+	@Override
+	public void invoke(OUT value) throws Exception {
+		if (this.producer == null) {
+			throw new RuntimeException("Kinesis producer has been closed");
+		}
+		if (thrownException != null) {
+			String errorMessages = "";
+			if (thrownException instanceof UserRecordFailedException) {
+				List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
+				for (Attempt attempt: attempts) {
+					if (attempt.getErrorMessage() != null) {
+						errorMessages += attempt.getErrorMessage() +"\n";
+					}
+				}
+			}
+			if (failOnError) {
+				throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException);
+			} else {
+				LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages);
+				thrownException = null; // reset
+			}
+		}
+
+		String stream = defaultStream;
+		String partition = defaultPartition;
+
+		ByteBuffer serialized = schema.serialize(value);
+
+		// maybe set custom stream
+		String customStream = schema.getTargetStream(value);
+		if (customStream != null) {
+			stream = customStream;
+		}
+
+		String explicitHashkey = null;
+		// maybe set custom partition
+		if (customPartitioner != null) {
+			partition = customPartitioner.getPartitionId(value);
+			explicitHashkey = customPartitioner.getExplicitHashKey(value);
+		}
+
+		if (stream == null) {
+			if (failOnError) {
+				throw new RuntimeException("No target stream set");
+			} else {
+				LOG.warn("No target stream set. Skipping record");
+				return;
+			}
+		}
+
+		ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized);
+		Futures.addCallback(cb, callback);
+	}
+
+	@Override
+	public void close() throws Exception {
+		LOG.info("Closing producer");
+		super.close();
+		KinesisProducer kp = this.producer;
+		this.producer = null;
+		if (kp != null) {
+			LOG.info("Flushing outstanding {} records", kp.getOutstandingRecordsCount());
+			// try to flush all outstanding records
+			while (kp.getOutstandingRecordsCount() > 0) {
+				kp.flush();
+				try {
+					Thread.sleep(500);
+				} catch (InterruptedException e) {
+					LOG.warn("Flushing was interrupted.");
+					// stop the blocking flushing and destroy producer immediately
+					break;
+				}
+			}
+			LOG.info("Flushing done. Destroying producer instance.");
+			kp.destroy();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
new file mode 100644
index 0000000..bd23abe
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis;
+
+
+import java.io.Serializable;
+
+public abstract class KinesisPartitioner<T> implements Serializable {
+
+	/**
+	 * Return a partition id based on the input
+	 * @param element Element to partition
+	 * @return A string representing the partition id
+	 */
+	public abstract String getPartitionId(T element);
+
+	/**
+	 * Optional method for setting an explicit hash key
+	 * @param element Element to get the hash key for
+	 * @return the hash key for the element
+	 */
+	public String getExplicitHashKey(T element) {
+		return null;
+	}
+
+	/**
+	 * Optional initializer.
+	 *
+	 * @param indexOfThisSubtask Index of this partitioner instance
+	 * @param numberOfParallelSubtasks Total number of parallel instances
+	 */
+	public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) {
+		//
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
new file mode 100644
index 0000000..01d4f00
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+
+/**
+ * Configuration keys for AWS service usage
+ */
+public class AWSConfigConstants {
+
+	/**
+	 * Possible configuration values for the type of credential provider to use when accessing AWS Kinesis.
+	 * Internally, a corresponding implementation of {@link AWSCredentialsProvider} will be used.
+	 */
+	public enum CredentialProvider {
+
+		/** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create AWS credentials */
+		ENV_VAR,
+
+		/** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials */
+		SYS_PROP,
+
+		/** Use a AWS credentials profile file to create the AWS credentials */
+		PROFILE,
+
+		/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties */
+		BASIC,
+
+		/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata **/
+		AUTO,
+	}
+
+	/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */
+	public static final String AWS_REGION = "aws.region";
+
+	/** The AWS access key ID to use when setting credentials provider type to BASIC */
+	public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
+
+	/** The AWS secret key to use when setting credentials provider type to BASIC */
+	public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
+
+	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/
+	public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+
+	/** Optional configuration for profile path if credential provider type is set to be PROFILE */
+	public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
+
+	/** Optional configuration for profile name if credential provider type is set to be PROFILE */
+	public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
+
+	/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set) */
+	public static final String AWS_ENDPOINT = "aws.endpoint";
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
new file mode 100644
index 0000000..76c20ed
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
+import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+
+/**
+ * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer}
+ */
+public class ConsumerConfigConstants extends AWSConfigConstants {
+
+	/**
+	 * The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used
+	 * when the consumer tasks retrieve the first shard iterator for each Kinesis shard.
+	 */
+	public enum InitialPosition {
+
+		/** Start reading from the earliest possible record in the stream (excluding expired data records) */
+		TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
+
+		/** Start reading from the latest incoming record */
+		LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM);
+
+		private SentinelSequenceNumber sentinelSequenceNumber;
+
+		InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) {
+			this.sentinelSequenceNumber = sentinelSequenceNumber;
+		}
+
+		public SentinelSequenceNumber toSentinelSequenceNumber() {
+			return this.sentinelSequenceNumber;
+		}
+	}
+
+	/** The initial position to start reading Kinesis streams from (LATEST is used if not set) */
+	public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
+
+	/** The base backoff time between each describeStream attempt */
+	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
+
+	/** The maximum backoff time between each describeStream attempt */
+	public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";
+
+	/** The power constant for exponential backoff between each describeStream attempt */
+	public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";
+
+	/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard */
+	public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
+
+	/** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException */
+	public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";
+
+	/** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
+	public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base";
+
+	/** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
+	public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max";
+
+	/** The power constant for exponential backoff between each getRecords attempt */
+	public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst";
+
+	/** The interval between each getRecords request to a AWS Kinesis shard in milliseconds */
+	public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";
+
+	/** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException */
+	public static final String SHARD_GETITERATOR_RETRIES = "flink.shard.getiterator.maxretries";
+
+	/** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
+	public static final String SHARD_GETITERATOR_BACKOFF_BASE = "flink.shard.getiterator.backoff.base";
+
+	/** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
+	public static final String SHARD_GETITERATOR_BACKOFF_MAX = "flink.shard.getiterator.backoff.max";
+
+	/** The power constant for exponential backoff between each getShardIterator attempt */
+	public static final String SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getiterator.backoff.expconst";
+
+	/** The interval between each attempt to discover new shards */
+	public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis";
+
+	// ------------------------------------------------------------------------
+	//  Default values for consumer configuration
+	// ------------------------------------------------------------------------
+
+	public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString();
+
+	public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
+
+	public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
+
+	public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+	public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100;
+
+	public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
+
+	public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L;
+
+	public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L;
+
+	public static final double DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+	public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 0L;
+
+	public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3;
+
+	public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L;
+
+	public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L;
+
+	public static final double DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+	public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L;
+
+	/**
+	 * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
+	 * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.
+	 */
+	public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
new file mode 100644
index 0000000..1edddfc
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.config;
+
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+
+/**
+ * Optional producer specific configuration keys for {@link FlinkKinesisProducer}
+ */
+public class ProducerConfigConstants extends AWSConfigConstants {
+
+	/** Maximum number of items to pack into an PutRecords request. **/
+	public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";
+
+	/** Maximum number of items to pack into an aggregated record. **/
+	public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount";
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
new file mode 100644
index 0000000..55668c6
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.util.Properties;
+
+/**
+ * This is an example on how to consume data from Kinesis
+ */
+public class ConsumeFromKinesis {
+
+	public static void main(String[] args) throws Exception {
+		ParameterTool pt = ParameterTool.fromArgs(args);
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(1);
+
+		Properties kinesisConsumerConfig = new Properties();
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region"));
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accesskey"));
+		kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretkey"));
+
+		DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
+			"flink-test",
+			new SimpleStringSchema(),
+			kinesisConsumerConfig));
+
+		kinesis.print();
+
+		see.execute();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
new file mode 100644
index 0000000..d178137
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.examples;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.util.Properties;
+
+/**
+ * This is an example on how to produce data into Kinesis
+ */
+public class ProduceIntoKinesis {
+
+	public static void main(String[] args) throws Exception {
+		ParameterTool pt = ParameterTool.fromArgs(args);
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(1);
+
+		DataStream<String> simpleStringStream = see.addSource(new EventsGenerator());
+
+		Properties kinesisProducerConfig = new Properties();
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region"));
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
+		kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey"));
+
+		FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(
+				new SimpleStringSchema(), kinesisProducerConfig);
+
+		kinesis.setFailOnError(true);
+		kinesis.setDefaultStream("flink-test");
+		kinesis.setDefaultPartition("0");
+
+		simpleStringStream.addSink(kinesis);
+
+		see.execute();
+	}
+
+	public static class EventsGenerator implements SourceFunction<String> {
+		private boolean running = true;
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			long seq = 0;
+			while(running) {
+				Thread.sleep(10);
+				ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
new file mode 100644
index 0000000..a06fdca
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following:
+ * <ul>
+ *     <li>1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset
+ *     		  of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be
+ *     		  subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe
+ *     		  to the same subset of shards even after restoring)</li>
+ *     <li>2. decide where in each discovered shard should the fetcher start subscribing to</li>
+ *     <li>3. subscribe to shards by creating a single thread for each shard</li>
+ * </ul>
+ *
+ * <p>The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery),
+ * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed
+ * by multiple threads, these operations should only be done using the handler methods provided in this class.
+ */
+public class KinesisDataFetcher<T> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class);
+
+	// ------------------------------------------------------------------------
+	//  Consumer-wide settings
+	// ------------------------------------------------------------------------
+
+	/** Configuration properties for the Flink Kinesis Consumer */
+	private final Properties configProps;
+
+	/** The list of Kinesis streams that the consumer is subscribing to */
+	private final List<String> streams;
+
+	/**
+	 * The deserialization schema we will be using to convert Kinesis records to Flink objects.
+	 * Note that since this might not be thread-safe, {@link ShardConsumer}s using this must
+	 * clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}.
+	 */
+	private final KinesisDeserializationSchema<T> deserializationSchema;
+
+	// ------------------------------------------------------------------------
+	//  Subtask-specific settings
+	// ------------------------------------------------------------------------
+
+	/** Runtime context of the subtask that this fetcher was created in */
+	private final RuntimeContext runtimeContext;
+
+	private final int totalNumberOfConsumerSubtasks;
+
+	private final int indexOfThisConsumerSubtask;
+
+	/**
+	 * This flag should be set by {@link FlinkKinesisConsumer} using
+	 * {@link KinesisDataFetcher#setIsRestoringFromFailure(boolean)}
+	 */
+	private boolean isRestoredFromFailure;
+
+	// ------------------------------------------------------------------------
+	//  Executor services to run created threads
+	// ------------------------------------------------------------------------
+
+	/** Executor service to run {@link ShardConsumer}s to consume Kinesis shards */
+	private final ExecutorService shardConsumersExecutor;
+
+	// ------------------------------------------------------------------------
+	//  Managed state, accessed and updated across multiple threads
+	// ------------------------------------------------------------------------
+
+	/** The last discovered shard ids of each subscribed stream, updated as the fetcher discovers new shards in.
+	 * Note: this state will be updated if new shards are found when {@link KinesisDataFetcher#discoverNewShardsToSubscribe()} is called.
+	 */
+	private final Map<String, String> subscribedStreamsToLastDiscoveredShardIds;
+
+	/**
+	 * The shards, along with their last processed sequence numbers, that this fetcher is subscribed to. The fetcher
+	 * will add new subscribed shard states to this list as it discovers new shards. {@link ShardConsumer} threads update
+	 * the last processed sequence number of subscribed shards as they fetch and process records.
+	 *
+	 * <p>Note that since multiple {@link ShardConsumer} threads will be performing operations on this list, all operations
+	 * must be wrapped in synchronized blocks on the {@link KinesisDataFetcher#checkpointLock} lock. For this purpose,
+	 * all threads must use the following thread-safe methods this class provides to operate on this list:
+	 * <ul>
+	 *     <li>{@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}</li>
+	 *     <li>{@link KinesisDataFetcher#updateState(int, SequenceNumber)}</li>
+	 *     <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(T, long, int, SequenceNumber)}</li>
+	 * </ul>
+	 */
+	private final List<KinesisStreamShardState> subscribedShardsState;
+
+	private final SourceFunction.SourceContext<T> sourceContext;
+
+	/** Checkpoint lock, also used to synchronize operations on subscribedShardsState */
+	private final Object checkpointLock;
+
+	/** Reference to the first error thrown by any of the {@link ShardConsumer} threads */
+	private final AtomicReference<Throwable> error;
+
+	/** The Kinesis proxy that the fetcher will be using to discover new shards */
+	private final KinesisProxyInterface kinesis;
+
+	/** Thread that executed runFetcher() */
+	private Thread mainThread;
+
+	/**
+	 * The current number of shards that are actively read by this fetcher.
+	 *
+	 * This value is updated in {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
+	 * and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}.
+	 */
+	private final AtomicInteger numberOfActiveShards = new AtomicInteger(0);
+
+	private volatile boolean running = true;
+
+	/**
+	 * Creates a Kinesis Data Fetcher.
+	 *
+	 * @param streams the streams to subscribe to
+	 * @param sourceContext context of the source function
+	 * @param runtimeContext this subtask's runtime context
+	 * @param configProps the consumer configuration properties
+	 * @param deserializationSchema deserialization schema
+	 */
+	public KinesisDataFetcher(List<String> streams,
+							SourceFunction.SourceContext<T> sourceContext,
+							RuntimeContext runtimeContext,
+							Properties configProps,
+							KinesisDeserializationSchema<T> deserializationSchema) {
+		this(streams,
+			sourceContext,
+			sourceContext.getCheckpointLock(),
+			runtimeContext,
+			configProps,
+			deserializationSchema,
+			new AtomicReference<Throwable>(),
+			new LinkedList<KinesisStreamShardState>(),
+			createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
+			KinesisProxy.create(configProps));
+	}
+
+	/** This constructor is exposed for testing purposes */
+	protected KinesisDataFetcher(List<String> streams,
+								SourceFunction.SourceContext<T> sourceContext,
+								Object checkpointLock,
+								RuntimeContext runtimeContext,
+								Properties configProps,
+								KinesisDeserializationSchema<T> deserializationSchema,
+								AtomicReference<Throwable> error,
+								LinkedList<KinesisStreamShardState> subscribedShardsState,
+								HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
+								KinesisProxyInterface kinesis) {
+		this.streams = checkNotNull(streams);
+		this.configProps = checkNotNull(configProps);
+		this.sourceContext = checkNotNull(sourceContext);
+		this.checkpointLock = checkNotNull(checkpointLock);
+		this.runtimeContext = checkNotNull(runtimeContext);
+		this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks();
+		this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
+		this.deserializationSchema = checkNotNull(deserializationSchema);
+		this.kinesis = checkNotNull(kinesis);
+
+		this.error = checkNotNull(error);
+		this.subscribedShardsState = checkNotNull(subscribedShardsState);
+		this.subscribedStreamsToLastDiscoveredShardIds = checkNotNull(subscribedStreamsToLastDiscoveredShardIds);
+
+		this.shardConsumersExecutor =
+			createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
+	}
+
+	/**
+	 * Starts the fetcher. After starting the fetcher, it can only
+	 * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
+	 *
+	 * @throws Exception the first error or exception thrown by the fetcher or any of the threads created by the fetcher.
+	 */
+	public void runFetcher() throws Exception {
+
+		// check that we are running before proceeding
+		if (!running) {
+			return;
+		}
+
+		this.mainThread = Thread.currentThread();
+
+		// ------------------------------------------------------------------------
+		//  Procedures before starting the infinite while loop:
+		// ------------------------------------------------------------------------
+
+		//  1. query for any new shards that may have been created while the Kinesis consumer was not running,
+		//     and register them to the subscribedShardState list.
+		if (LOG.isDebugEnabled()) {
+			String logFormat = (!isRestoredFromFailure)
+				? "Subtask {} is trying to discover initial shards ..."
+				: "Subtask {} is trying to discover any new shards that were created while the consumer wasn't " +
+				"running due to failure ...";
+
+			LOG.debug(logFormat, indexOfThisConsumerSubtask);
+		}
+		List<KinesisStreamShard> newShardsCreatedWhileNotRunning = discoverNewShardsToSubscribe();
+		for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) {
+			// the starting state for new shards created while the consumer wasn't running depends on whether or not
+			// we are starting fresh (not restoring from a checkpoint); when we are starting fresh, this simply means
+			// all existing shards of streams we are subscribing to are new shards; when we are restoring from checkpoint,
+			// any new shards due to Kinesis resharding from the time of the checkpoint will be considered new shards.
+			InitialPosition initialPosition = InitialPosition.valueOf(configProps.getProperty(
+				ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION));
+
+			SentinelSequenceNumber startingStateForNewShard = (isRestoredFromFailure)
+				? SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM
+				: initialPosition.toSentinelSequenceNumber();
+
+			if (LOG.isInfoEnabled()) {
+				String logFormat = (!isRestoredFromFailure)
+					? "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}"
+					: "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't " +
+					"running due to failure, starting state set as sequence number {}";
+
+				LOG.info(logFormat, indexOfThisConsumerSubtask, shard.toString(), startingStateForNewShard.get());
+			}
+			registerNewSubscribedShardState(new KinesisStreamShardState(shard, startingStateForNewShard.get()));
+		}
+
+		//  2. check that there is at least one shard in the subscribed streams to consume from (can be done by
+		//     checking if at least one value in subscribedStreamsToLastDiscoveredShardIds is not null)
+		boolean hasShards = false;
+		StringBuilder streamsWithNoShardsFound = new StringBuilder();
+		for (Map.Entry<String, String> streamToLastDiscoveredShardEntry : subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
+			if (streamToLastDiscoveredShardEntry.getValue() != null) {
+				hasShards = true;
+			} else {
+				streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(", ");
+			}
+		}
+
+		if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) {
+			LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams: {}",
+				indexOfThisConsumerSubtask, streamsWithNoShardsFound.toString());
+		}
+
+		if (!hasShards) {
+			throw new RuntimeException("No shards can be found for all subscribed streams: " + streams);
+		}
+
+		//  3. start consuming any shard state we already have in the subscribedShardState up to this point; the
+		//     subscribedShardState may already be seeded with values due to step 1., or explicitly added by the
+		//     consumer using a restored state checkpoint
+		for (int seededStateIndex = 0; seededStateIndex < subscribedShardsState.size(); seededStateIndex++) {
+			KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex);
+
+			// only start a consuming thread if the seeded subscribed shard has not been completely read already
+			if (!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
+
+				if (LOG.isInfoEnabled()) {
+					LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}",
+						indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(),
+						seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
+					}
+
+				shardConsumersExecutor.submit(
+					new ShardConsumer<>(
+						this,
+						seededStateIndex,
+						subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
+						subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
+			}
+		}
+
+		// ------------------------------------------------------------------------
+
+		// finally, start the infinite shard discovery and consumer launching loop;
+		// we will escape from this loop only when shutdownFetcher() or stopWithError() is called
+
+		final long discoveryIntervalMillis = Long.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
+
+		// FLINK-4341:
+		// For downstream operators that work on time (ex. window operators), we are required to emit a max value watermark
+		// for subtasks that won't continue to have shards to read from unless resharding happens in the future, otherwise
+		// the downstream watermarks would not advance, leading to unbounded accumulating state.
+		//
+		// The side-effect of this limitation is that on resharding, we must fail hard if the newly discovered shard
+		// is to be subscribed by a subtask that has previously emitted a max value watermark, otherwise the watermarks
+		// will be messed up.
+		//
+		// There are 2 cases were we need to either emit a max value watermark, or deliberately fail hard:
+		//  (a) if this subtask has no more shards to read from unless resharding happens in the future, we emit a max
+		//      value watermark. This case is encountered when 1) all previously read shards by this subtask were closed
+		//      due to resharding, 2) when this subtask was initially only subscribed to closed shards while the consumer
+		//      was told to start from TRIM_HORIZON, or 3) there was initially no shards for this subtask to read on startup.
+		//  (b) this subtask has discovered new shards to read from due to a reshard; if this subtask has already emitted
+		//      a max value watermark, we must deliberately fail hard to avoid messing up the watermarks. The new shards
+		//      will be subscribed by this subtask after restore as initial shards on startup.
+		//
+		// TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager
+		// Please see FLINK-4341 for more detail
+
+		boolean emittedMaxValueWatermark = false;
+
+		if (this.numberOfActiveShards.get() == 0) {
+			// FLINK-4341 workaround case (a) - please see the above for details on this case
+			LOG.info("Subtask {} has no initial shards to read on startup; emitting max value watermark ...",
+				indexOfThisConsumerSubtask);
+			sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
+			emittedMaxValueWatermark = true;
+		}
+
+		while (running) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...",
+					indexOfThisConsumerSubtask);
+			}
+			List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe();
+
+			// -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards --
+			// Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(), there exists
+			// a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards
+			// may not correctly reflect the discover result in the below case determination. This may lead to incorrect
+			// case determination on the current discovery attempt, but can still be correctly handled on future attempts.
+			//
+			// Although this can be resolved by wrapping the current shard discovery attempt with the below
+			// case determination within a synchronized block on the checkpoint lock for atomicity, there will be
+			// considerable throughput performance regression as shard discovery is a remote call to AWS. Therefore,
+			// since the case determination is a temporary workaround for FLINK-4341, the race condition is tolerable as
+			// we can still eventually handle max value watermark emitting / deliberately failing on successive
+			// discovery attempts.
+
+			if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) {
+				// FLINK-4341 workaround case (a) - please see the above for details on this case
+				LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...",
+					indexOfThisConsumerSubtask);
+				sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
+				emittedMaxValueWatermark = true;
+			} else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) {
+				// FLINK-4341 workaround case (b) - please see the above for details on this case
+				//
+				// Note that in the case where on resharding this subtask ceased to read all of it's previous shards
+				// but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark
+				// will be false; this allows the fetcher to continue reading the new shards without failing on such cases.
+				// However, due to the race condition mentioned above, we might still fall into case (a) first, and
+				// then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value
+				// watermark emitting still remains to be correct.
+
+				LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" +
+						" up watermarks; the new shards will be subscribed by this subtask after restore ...",
+					indexOfThisConsumerSubtask, newShardsDueToResharding.size());
+				throw new RuntimeException("Deliberate failure to avoid messing up watermarks");
+			}
+
+			for (KinesisStreamShard shard : newShardsDueToResharding) {
+				// since there may be delay in discovering a new shard, all new shards due to
+				// resharding should be read starting from the earliest record possible
+				KinesisStreamShardState newShardState =
+					new KinesisStreamShardState(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
+				int newStateIndex = registerNewSubscribedShardState(newShardState);
+
+				if (LOG.isInfoEnabled()) {
+					LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming " +
+							"the shard from sequence number {} with ShardConsumer {}",
+						indexOfThisConsumerSubtask, newShardState.getKinesisStreamShard().toString(),
+						newShardState.getLastProcessedSequenceNum(), newStateIndex);
+				}
+
+				shardConsumersExecutor.submit(
+					new ShardConsumer<>(
+						this,
+						newStateIndex,
+						newShardState.getKinesisStreamShard(),
+						newShardState.getLastProcessedSequenceNum()));
+			}
+
+			// we also check if we are running here so that we won't start the discovery sleep
+			// interval if the running flag was set to false during the middle of the while loop
+			if (running && discoveryIntervalMillis != 0) {
+				try {
+					Thread.sleep(discoveryIntervalMillis);
+				} catch (InterruptedException iex) {
+					// the sleep may be interrupted by shutdownFetcher()
+				}
+			}
+		}
+
+		// make sure all resources have been terminated before leaving
+		awaitTermination();
+
+		// any error thrown in the shard consumer threads will be thrown to the main thread
+		Throwable throwable = this.error.get();
+		if (throwable != null) {
+			if (throwable instanceof Exception) {
+				throw (Exception) throwable;
+			} else if (throwable instanceof Error) {
+				throw (Error) throwable;
+			} else {
+				throw new Exception(throwable);
+			}
+		}
+	}
+
+	/**
+	 * Creates a snapshot of the current last processed sequence numbers of each subscribed shard.
+	 *
+	 * @return state snapshot
+	 */
+	public HashMap<KinesisStreamShard, SequenceNumber> snapshotState() {
+		// this method assumes that the checkpoint lock is held
+		assert Thread.holdsLock(checkpointLock);
+
+		HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new HashMap<>();
+		for (KinesisStreamShardState shardWithState : subscribedShardsState) {
+			stateSnapshot.put(shardWithState.getKinesisStreamShard(), shardWithState.getLastProcessedSequenceNum());
+		}
+		return stateSnapshot;
+	}
+
+	/**
+	 * Starts shutting down the fetcher. Must be called to allow {@link KinesisDataFetcher#runFetcher()} to complete.
+	 * Once called, the shutdown procedure will be executed and all shard consuming threads will be interrupted.
+	 */
+	public void shutdownFetcher() {
+		running = false;
+		mainThread.interrupt(); // the main thread may be sleeping for the discovery interval
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
+		}
+		shardConsumersExecutor.shutdownNow();
+	}
+
+	/** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown */
+	public void awaitTermination() throws InterruptedException {
+		while(!shardConsumersExecutor.isTerminated()) {
+			Thread.sleep(50);
+		}
+	}
+
+	/** Called by created threads to pass on errors. Only the first thrown error is set.
+	 * Once set, the shutdown process will be executed and all shard consuming threads will be interrupted. */
+	protected void stopWithError(Throwable throwable) {
+		if (this.error.compareAndSet(null, throwable)) {
+			shutdownFetcher();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Functions that update the subscribedStreamToLastDiscoveredShardIds state
+	// ------------------------------------------------------------------------
+
+	/** Updates the last discovered shard of a subscribed stream; only updates if the update is valid */
+	public void advanceLastDiscoveredShardOfStream(String stream, String shardId) {
+		String lastSeenShardIdOfStream = this.subscribedStreamsToLastDiscoveredShardIds.get(stream);
+
+		// the update is valid only if the given shard id is greater
+		// than the previous last seen shard id of the stream
+		if (lastSeenShardIdOfStream == null) {
+			// if not previously set, simply put as the last seen shard id
+			this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
+		} else if (KinesisStreamShard.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) {
+			this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
+		}
+	}
+
+	/**
+	 * A utility function that does the following:
+	 *
+	 * 1. Find new shards for each stream that we haven't seen before
+	 * 2. For each new shard, determine whether this consumer subtask should subscribe to them;
+	 * 	  if yes, it is added to the returned list of shards
+	 * 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards
+	 *    that we have already seen before the next time this function is called
+	 */
+	private List<KinesisStreamShard> discoverNewShardsToSubscribe() throws InterruptedException {
+
+		List<KinesisStreamShard> newShardsToSubscribe = new LinkedList<>();
+
+		GetShardListResult shardListResult = kinesis.getShardList(subscribedStreamsToLastDiscoveredShardIds);
+		if (shardListResult.hasRetrievedShards()) {
+			Set<String> streamsWithNewShards = shardListResult.getStreamsWithRetrievedShards();
+
+			for (String stream : streamsWithNewShards) {
+				List<KinesisStreamShard> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream);
+				for (KinesisStreamShard newShard : newShardsOfStream) {
+					if (isThisSubtaskShouldSubscribeTo(newShard, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) {
+						newShardsToSubscribe.add(newShard);
+					}
+				}
+
+				advanceLastDiscoveredShardOfStream(
+					stream, shardListResult.getLastSeenShardOfStream(stream).getShard().getShardId());
+			}
+		}
+
+		return newShardsToSubscribe;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Functions to get / set information about the consumer
+	// ------------------------------------------------------------------------
+
+	public void setIsRestoringFromFailure(boolean bool) {
+		this.isRestoredFromFailure = bool;
+	}
+
+	protected Properties getConsumerConfiguration() {
+		return configProps;
+	}
+
+	protected KinesisDeserializationSchema<T> getClonedDeserializationSchema() {
+		try {
+			return InstantiationUtil.clone(deserializationSchema, runtimeContext.getUserCodeClassLoader());
+		} catch (IOException | ClassNotFoundException ex) {
+			// this really shouldn't happen; simply wrap it around a runtime exception
+			throw new RuntimeException(ex);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Thread-safe operations for record emitting and shard state updating
+	//  that assure atomicity with respect to the checkpoint lock
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Atomic operation to collect a record and update state to the sequence number of the record.
+	 * This method is called by {@link ShardConsumer}s.
+	 *
+	 * @param record the record to collect
+	 * @param recordTimestamp timestamp to attach to the collected record
+	 * @param shardStateIndex index of the shard to update in subscribedShardsState;
+	 *                        this index should be the returned value from
+	 *                        {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
+	 *                        when the shard state was registered.
+	 * @param lastSequenceNumber the last sequence number value to update
+	 */
+	protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
+		synchronized (checkpointLock) {
+			sourceContext.collectWithTimestamp(record, recordTimestamp);
+			updateState(shardStateIndex, lastSequenceNumber);
+		}
+	}
+
+	/**
+	 * Update the shard to last processed sequence number state.
+	 * This method is called by {@link ShardConsumer}s.
+	 *
+	 * @param shardStateIndex index of the shard to update in subscribedShardsState;
+	 *                        this index should be the returned value from
+	 *                        {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
+	 *                        when the shard state was registered.
+	 * @param lastSequenceNumber the last sequence number value to update
+	 */
+	protected void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
+		synchronized (checkpointLock) {
+			subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);
+
+			// if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
+			// we've finished reading the shard and should determine it to be non-active
+			if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
+				this.numberOfActiveShards.decrementAndGet();
+				LOG.info("Subtask {} has reached the end of subscribed shard: {}",
+					indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
+			}
+		}
+	}
+
+	/**
+	 * Register a new subscribed shard state.
+	 *
+	 * @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to
+	 */
+	public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) {
+		synchronized (checkpointLock) {
+			subscribedShardsState.add(newSubscribedShardState);
+
+			// If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
+			// if the consumer had already finished reading a shard before we failed and restored), we determine that
+			// this subtask has a new active shard
+			if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
+				this.numberOfActiveShards.incrementAndGet();
+			}
+
+			return subscribedShardsState.size()-1;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Miscellaneous utility functions
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Utility function to determine whether a shard should be subscribed by this consumer subtask.
+	 *
+	 * @param shard the shard to determine
+	 * @param totalNumberOfConsumerSubtasks total number of consumer subtasks
+	 * @param indexOfThisConsumerSubtask index of this consumer subtask
+	 */
+	private static boolean isThisSubtaskShouldSubscribeTo(KinesisStreamShard shard,
+														int totalNumberOfConsumerSubtasks,
+														int indexOfThisConsumerSubtask) {
+		return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask;
+	}
+
+	private static ExecutorService createShardConsumersThreadPool(final String subtaskName) {
+		return Executors.newCachedThreadPool(new ThreadFactory() {
+			@Override
+			public Thread newThread(Runnable runnable) {
+				final AtomicLong threadCount = new AtomicLong(0);
+				Thread thread = new Thread(runnable);
+				thread.setName("shardConsumers-" + subtaskName + "-thread-" + threadCount.getAndIncrement());
+				thread.setDaemon(true);
+				return thread;
+			}
+		});
+	}
+
+	/**
+	 * Utility function to create an initial map of the last discovered shard id of each subscribed stream, set to null;
+	 * This is called in the constructor; correct values will be set later on by calling advanceLastDiscoveredShardOfStream()
+	 *
+	 * @param streams the list of subscribed streams
+	 * @return the initial map for subscribedStreamsToLastDiscoveredShardIds
+	 */
+	protected static HashMap<String, String> createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams) {
+		HashMap<String, String> initial = new HashMap<>();
+		for (String stream : streams) {
+			initial.put(stream, null);
+		}
+		return initial;
+	}
+}