You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:17 UTC

[27/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
new file mode 100644
index 0000000..612a4a7
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Thread that does the actual data pulling from AWS Kinesis shards. Each thread is in charge of one Kinesis shard only.
+ */
+public class ShardConsumer<T> implements Runnable {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
+
+	private final KinesisDeserializationSchema<T> deserializer;
+
+	private final KinesisProxyInterface kinesis;
+
+	private final int subscribedShardStateIndex;
+
+	private final KinesisDataFetcher<T> fetcherRef;
+
+	private final KinesisStreamShard subscribedShard;
+
+	private final int maxNumberOfRecordsPerFetch;
+	private final long fetchIntervalMillis;
+
+	private SequenceNumber lastSequenceNum;
+
+	/**
+	 * Creates a shard consumer.
+	 *
+	 * @param fetcherRef reference to the owning fetcher
+	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
+	 * @param subscribedShard the shard this consumer is subscribed to
+	 * @param lastSequenceNum the sequence number in the shard to start consuming
+	 */
+	public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+						Integer subscribedShardStateIndex,
+						KinesisStreamShard subscribedShard,
+						SequenceNumber lastSequenceNum) {
+		this(fetcherRef,
+			subscribedShardStateIndex,
+			subscribedShard,
+			lastSequenceNum,
+			KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
+	}
+
+	/** This constructor is exposed for testing purposes */
+	protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
+							Integer subscribedShardStateIndex,
+							KinesisStreamShard subscribedShard,
+							SequenceNumber lastSequenceNum,
+							KinesisProxyInterface kinesis) {
+		this.fetcherRef = checkNotNull(fetcherRef);
+		this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex);
+		this.subscribedShard = checkNotNull(subscribedShard);
+		this.lastSequenceNum = checkNotNull(lastSequenceNum);
+		checkArgument(
+			!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
+			"Should not start a ShardConsumer if the shard has already been completely read.");
+
+		this.deserializer = fetcherRef.getClonedDeserializationSchema();
+
+		Properties consumerConfig = fetcherRef.getConsumerConfiguration();
+		this.kinesis = kinesis;
+		this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfig.getProperty(
+			ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
+			Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
+		this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
+			ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
+			Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void run() {
+		String nextShardItr;
+
+		try {
+			// before infinitely looping, we set the initial nextShardItr appropriately
+
+			if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
+				// if the shard is already closed, there will be no latest next record to get for this shard
+				if (subscribedShard.isClosed()) {
+					nextShardItr = null;
+				} else {
+					nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
+				}
+			} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
+				nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
+			} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
+				nextShardItr = null;
+			} else {
+				// we will be starting from an actual sequence number (due to restore from failure).
+				// if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records
+				// from the last aggregated record; otherwise, we can simply start iterating from the record right after.
+
+				if (lastSequenceNum.isAggregated()) {
+					String itrForLastAggregatedRecord =
+						kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
+
+					// get only the last aggregated record
+					GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1);
+
+					List<UserRecord> fetchedRecords = deaggregateRecords(
+						getRecordsResult.getRecords(),
+						subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
+						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+
+					long lastSubSequenceNum = lastSequenceNum.getSubSequenceNumber();
+					for (UserRecord record : fetchedRecords) {
+						// we have found a dangling sub-record if it has a larger subsequence number
+						// than our last sequence number; if so, collect the record and update state
+						if (record.getSubSequenceNumber() > lastSubSequenceNum) {
+							deserializeRecordForCollectionAndUpdateState(record);
+						}
+					}
+
+					// set the nextShardItr so we can continue iterating in the next while loop
+					nextShardItr = getRecordsResult.getNextShardIterator();
+				} else {
+					// the last record was non-aggregated, so we can simply start from the next record
+					nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
+				}
+			}
+
+			while(isRunning()) {
+				if (nextShardItr == null) {
+					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
+
+					// we can close this consumer thread once we've reached the end of the subscribed shard
+					break;
+				} else {
+					if (fetchIntervalMillis != 0) {
+						Thread.sleep(fetchIntervalMillis);
+					}
+
+					GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
+
+					// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
+					List<UserRecord> fetchedRecords = deaggregateRecords(
+						getRecordsResult.getRecords(),
+						subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
+						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+
+					for (UserRecord record : fetchedRecords) {
+						deserializeRecordForCollectionAndUpdateState(record);
+					}
+
+					nextShardItr = getRecordsResult.getNextShardIterator();
+				}
+			}
+		} catch (Throwable t) {
+			fetcherRef.stopWithError(t);
+		}
+	}
+
+	/**
+	 * The loop in run() checks this before fetching next batch of records. Since this runnable will be executed
+	 * by the ExecutorService {@link KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread
+	 * would be by calling shutdownNow() on {@link KinesisDataFetcher#shardConsumersExecutor} and let the executor service
+	 * interrupt all currently running {@link ShardConsumer}s.
+	 */
+	private boolean isRunning() {
+		return !Thread.interrupted();
+	}
+
+	/**
+	 * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last
+	 * successfully collected sequence number in this shard consumer is also updated so that
+	 * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard
+	 * iterators if necessary.
+	 *
+	 * Note that the server-side Kinesis timestamp is attached to the record when collected. When the
+	 * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default.
+	 *
+	 * @param record record to deserialize and collect
+	 * @throws IOException
+	 */
+	private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
+		throws IOException {
+		ByteBuffer recordData = record.getData();
+
+		byte[] dataBytes = new byte[recordData.remaining()];
+		recordData.get(dataBytes);
+
+		final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime();
+
+		final T value = deserializer.deserialize(
+			dataBytes,
+			record.getPartitionKey(),
+			record.getSequenceNumber(),
+			approxArrivalTimestamp,
+			subscribedShard.getStreamName(),
+			subscribedShard.getShard().getShardId());
+
+		SequenceNumber collectedSequenceNumber = (record.isAggregated())
+			? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())
+			: new SequenceNumber(record.getSequenceNumber());
+
+		fetcherRef.emitRecordAndUpdateState(
+			value,
+			approxArrivalTimestamp,
+			subscribedShardStateIndex,
+			collectedSequenceNumber);
+
+		lastSequenceNum = collectedSequenceNumber;
+	}
+
+	/**
+	 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
+	 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
+	 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
+	 * be used for the next call to this method.
+	 *
+	 * Note: it is important that this method is not called again before all the records from the last result have been
+	 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
+	 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
+	 * incorrect shard iteration if the iterator had to be refreshed.
+	 *
+	 * @param shardItr shard iterator to use
+	 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
+	 * @return get records result
+	 * @throws InterruptedException
+	 */
+	private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
+		GetRecordsResult getRecordsResult = null;
+		while (getRecordsResult == null) {
+			try {
+				getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
+			} catch (ExpiredIteratorException eiEx) {
+				LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
+					" refreshing the iterator ...", shardItr, subscribedShard);
+				shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
+
+				// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
+				if (fetchIntervalMillis != 0) {
+					Thread.sleep(fetchIntervalMillis);
+				}
+			}
+		}
+		return getRecordsResult;
+	}
+
+	@SuppressWarnings("unchecked")
+	protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
+		return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
new file mode 100644
index 0000000..53ed11b
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import com.amazonaws.services.kinesis.model.Shard;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
+ * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to
+ * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges.
+ */
+public class KinesisStreamShard implements Serializable {
+
+	private static final long serialVersionUID = -6004217801761077536L;
+
+	private final String streamName;
+	private final Shard shard;
+
+	private final int cachedHash;
+
+	/**
+	 * Create a new KinesisStreamShard
+	 *
+	 * @param streamName
+	 *           the name of the Kinesis stream that this shard belongs to
+	 * @param shard
+	 *           the actual AWS Shard instance that will be wrapped within this KinesisStreamShard
+	 */
+	public KinesisStreamShard(String streamName, Shard shard) {
+		this.streamName = checkNotNull(streamName);
+		this.shard = checkNotNull(shard);
+
+		// since our description of Kinesis Streams shards can be fully defined with the stream name and shard id,
+		// our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation
+		int hash = 17;
+		hash = 37 * hash + streamName.hashCode();
+		hash = 37 * hash + shard.getShardId().hashCode();
+		this.cachedHash = hash;
+	}
+
+	public String getStreamName() {
+		return streamName;
+	}
+
+	public boolean isClosed() {
+		return (shard.getSequenceNumberRange().getEndingSequenceNumber() != null);
+	}
+
+	public Shard getShard() {
+		return shard;
+	}
+
+	@Override
+	public String toString() {
+		return "KinesisStreamShard{" +
+			"streamName='" + streamName + "'" +
+			", shard='" + shard.toString() + "'}";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (!(obj instanceof KinesisStreamShard)) {
+			return false;
+		}
+
+		if (obj == this) {
+			return true;
+		}
+
+		KinesisStreamShard other = (KinesisStreamShard) obj;
+
+		return streamName.equals(other.getStreamName()) && shard.equals(other.getShard());
+	}
+
+	@Override
+	public int hashCode() {
+		return cachedHash;
+	}
+
+	/**
+	 * Utility function to compare two shard ids
+	 *
+	 * @param firstShardId first shard id to compare
+	 * @param secondShardId second shard id to compare
+	 * @return a value less than 0 if the first shard id is smaller than the second shard id,
+	 *         or a value larger than 0 the first shard is larger then the second shard id,
+	 *         or 0 if they are equal
+	 */
+	public static int compareShardIds(String firstShardId, String secondShardId) {
+		if (!isValidShardId(firstShardId)) {
+			throw new IllegalArgumentException("The first shard id has invalid format.");
+		}
+
+		if (!isValidShardId(secondShardId)) {
+			throw new IllegalArgumentException("The second shard id has invalid format.");
+		}
+
+		// digit segment of the shard id starts at index 8
+		return Long.compare(Long.parseLong(firstShardId.substring(8)), Long.parseLong(secondShardId.substring(8)));
+	}
+
+	/**
+	 * Checks if a shard id has valid format.
+	 * Kinesis stream shard ids have 12-digit numbers left-padded with 0's,
+	 * prefixed with "shardId-", ex. "shardId-000000000015".
+	 *
+	 * @param shardId the shard id to check
+	 * @return whether the shard id is valid
+	 */
+	public static boolean isValidShardId(String shardId) {
+		if (shardId == null) { return false; }
+		return shardId.matches("^shardId-\\d{12}");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
new file mode 100644
index 0000000..00181da
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+/**
+ * A wrapper class that bundles a {@link KinesisStreamShard} with its last processed sequence number.
+ */
+public class KinesisStreamShardState {
+
+	private KinesisStreamShard kinesisStreamShard;
+	private SequenceNumber lastProcessedSequenceNum;
+
+	public KinesisStreamShardState(KinesisStreamShard kinesisStreamShard, SequenceNumber lastProcessedSequenceNum) {
+		this.kinesisStreamShard = kinesisStreamShard;
+		this.lastProcessedSequenceNum = lastProcessedSequenceNum;
+	}
+
+	public KinesisStreamShard getKinesisStreamShard() {
+		return this.kinesisStreamShard;
+	}
+
+	public SequenceNumber getLastProcessedSequenceNum() {
+		return this.lastProcessedSequenceNum;
+	}
+
+	public void setLastProcessedSequenceNum(SequenceNumber update) {
+		this.lastProcessedSequenceNum = update;
+	}
+
+	@Override
+	public String toString() {
+		return "KinesisStreamShardState{" +
+			"kinesisStreamShard='" + kinesisStreamShard.toString() + "'" +
+			", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (!(obj instanceof KinesisStreamShardState)) {
+			return false;
+		}
+
+		if (obj == this) {
+			return true;
+		}
+
+		KinesisStreamShardState other = (KinesisStreamShardState) obj;
+
+		return kinesisStreamShard.equals(other.getKinesisStreamShard()) && lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
+	}
+
+	@Override
+	public int hashCode() {
+		return 37 * (kinesisStreamShard.hashCode() + lastProcessedSequenceNum.hashCode());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
new file mode 100644
index 0000000..8182201
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+
+/**
+ * Special flag values for sequence numbers in shards to indicate special positions.
+ * The value is initially set by {@link FlinkKinesisConsumer} when {@link KinesisDataFetcher}s are created.
+ * The KinesisDataFetchers will use this value to determine how to retrieve the starting shard iterator from AWS Kinesis.
+ */
+public enum SentinelSequenceNumber {
+
+	/** Flag value for shard's sequence numbers to indicate that the
+	 * shard should start to be read from the latest incoming records */
+	SENTINEL_LATEST_SEQUENCE_NUM( new SequenceNumber("LATEST_SEQUENCE_NUM") ),
+
+	/** Flag value for shard's sequence numbers to indicate that the shard should
+	 * start to be read from the earliest records that haven't expired yet */
+	SENTINEL_EARLIEST_SEQUENCE_NUM( new SequenceNumber("EARLIEST_SEQUENCE_NUM") ),
+
+	/** Flag value to indicate that we have already read the last record of this shard
+	 * (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record) */
+	SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") );
+
+	private SequenceNumber sentinel;
+
+	SentinelSequenceNumber(SequenceNumber sentinel) {
+		this.sentinel = sentinel;
+	}
+
+	public SequenceNumber get() {
+		return sentinel;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
new file mode 100644
index 0000000..021f53f
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.model;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a Kinesis record's sequence number. It has two fields: the main sequence number,
+ * and also a subsequence number. If this {@link SequenceNumber} is referring to an aggregated Kinesis record, the
+ * subsequence number will be a non-negative value representing the order of the sub-record within the aggregation.
+ */
+public class SequenceNumber implements Serializable {
+
+	private static final long serialVersionUID = 876972197938972667L;
+
+	private static final String DELIMITER = "-";
+
+	private final String sequenceNumber;
+	private final long subSequenceNumber;
+
+	private final int cachedHash;
+
+	/**
+	 * Create a new instance for a non-aggregated Kinesis record without a subsequence number.
+	 * @param sequenceNumber the sequence number
+	 */
+	public SequenceNumber(String sequenceNumber) {
+		this(sequenceNumber, -1);
+	}
+
+	/**
+	 * Create a new instance, with the specified sequence number and subsequence number.
+	 * To represent the sequence number for a non-aggregated Kinesis record, the subsequence number should be -1.
+	 * Otherwise, give a non-negative sequence number to represent an aggregated Kinesis record.
+	 *
+	 * @param sequenceNumber the sequence number
+	 * @param subSequenceNumber the subsequence number (-1 to represent non-aggregated Kinesis records)
+	 */
+	public SequenceNumber(String sequenceNumber, long subSequenceNumber) {
+		this.sequenceNumber = checkNotNull(sequenceNumber);
+		this.subSequenceNumber = subSequenceNumber;
+
+		this.cachedHash = 37 * (sequenceNumber.hashCode() + Long.valueOf(subSequenceNumber).hashCode());
+	}
+
+	public boolean isAggregated() {
+		return subSequenceNumber >= 0;
+	}
+
+	public String getSequenceNumber() {
+		return sequenceNumber;
+	}
+
+	public long getSubSequenceNumber() {
+		return subSequenceNumber;
+	}
+
+	@Override
+	public String toString() {
+		if (isAggregated()) {
+			return sequenceNumber + DELIMITER + subSequenceNumber;
+		} else {
+			return sequenceNumber;
+		}
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (!(obj instanceof SequenceNumber)) {
+			return false;
+		}
+
+		if (obj == this) {
+			return true;
+		}
+
+		SequenceNumber other = (SequenceNumber) obj;
+
+		return sequenceNumber.equals(other.getSequenceNumber())
+			&& (subSequenceNumber == other.getSubSequenceNumber());
+	}
+
+	@Override
+	public int hashCode() {
+		return cachedHash;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
new file mode 100644
index 0000000..04b1654
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Basic model class to bundle the shards retrieved from Kinesis on a {@link KinesisProxyInterface#getShardList(Map)} call.
+ */
+public class GetShardListResult {
+
+	private final Map<String, LinkedList<KinesisStreamShard>> streamsToRetrievedShardList = new HashMap<>();
+
+	public void addRetrievedShardToStream(String stream, KinesisStreamShard retrievedShard) {
+		if (!streamsToRetrievedShardList.containsKey(stream)) {
+			streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
+		}
+		streamsToRetrievedShardList.get(stream).add(retrievedShard);
+	}
+
+	public void addRetrievedShardsToStream(String stream, List<KinesisStreamShard> retrievedShards) {
+		if (retrievedShards.size() != 0) {
+			if (!streamsToRetrievedShardList.containsKey(stream)) {
+				streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
+			}
+			streamsToRetrievedShardList.get(stream).addAll(retrievedShards);
+		}
+	}
+
+	public List<KinesisStreamShard> getRetrievedShardListOfStream(String stream) {
+		if (!streamsToRetrievedShardList.containsKey(stream)) {
+			return null;
+		} else {
+			return streamsToRetrievedShardList.get(stream);
+		}
+	}
+
+	public KinesisStreamShard getLastSeenShardOfStream(String stream) {
+		if (!streamsToRetrievedShardList.containsKey(stream)) {
+			return null;
+		} else {
+			return streamsToRetrievedShardList.get(stream).getLast();
+		}
+	}
+
+	public boolean hasRetrievedShards() {
+		return !streamsToRetrievedShardList.isEmpty();
+	}
+
+	public Set<String> getStreamsWithRetrievedShards() {
+		return streamsToRetrievedShardList.keySet();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
new file mode 100644
index 0000000..9ffc8e6
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Kinesis proxy implementation - a utility class that is used as a proxy to make
+ * calls to AWS Kinesis for several functions, such as getting a list of shards and
+ * fetching a batch of data records starting from a specified record sequence number.
+ *
+ * NOTE:
+ * In the AWS KCL library, there is a similar implementation - {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
+ * This implementation differs mainly in that we can make operations to arbitrary Kinesis streams, which is a needed
+ * functionality for the Flink Kinesis Connecter since the consumer may simultaneously read from multiple Kinesis streams.
+ */
+public class KinesisProxy implements KinesisProxyInterface {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);
+
+	/** The actual Kinesis client from the AWS SDK that we will be using to make calls */
+	private final AmazonKinesisClient kinesisClient;
+
+	/** Random seed used to calculate backoff jitter for Kinesis operations */
+	private final static Random seed = new Random();
+
+	// ------------------------------------------------------------------------
+	//  describeStream() related performance settings
+	// ------------------------------------------------------------------------
+
+	/** Base backoff millis for the describe stream operation */
+	private final long describeStreamBaseBackoffMillis;
+
+	/** Maximum backoff millis for the describe stream operation */
+	private final long describeStreamMaxBackoffMillis;
+
+	/** Exponential backoff power constant for the describe stream operation */
+	private final double describeStreamExpConstant;
+
+	// ------------------------------------------------------------------------
+	//  getRecords() related performance settings
+	// ------------------------------------------------------------------------
+
+	/** Base backoff millis for the get records operation */
+	private final long getRecordsBaseBackoffMillis;
+
+	/** Maximum backoff millis for the get records operation */
+	private final long getRecordsMaxBackoffMillis;
+
+	/** Exponential backoff power constant for the get records operation */
+	private final double getRecordsExpConstant;
+
+	/** Maximum attempts for the get records operation */
+	private final int getRecordsMaxAttempts;
+
+	// ------------------------------------------------------------------------
+	//  getShardIterator() related performance settings
+	// ------------------------------------------------------------------------
+
+	/** Base backoff millis for the get shard iterator operation */
+	private final long getShardIteratorBaseBackoffMillis;
+
+	/** Maximum backoff millis for the get shard iterator operation */
+	private final long getShardIteratorMaxBackoffMillis;
+
+	/** Exponential backoff power constant for the get shard iterator operation */
+	private final double getShardIteratorExpConstant;
+
+	/** Maximum attempts for the get shard iterator operation */
+	private final int getShardIteratorMaxAttempts;
+
+	/**
+	 * Create a new KinesisProxy based on the supplied configuration properties
+	 *
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	private KinesisProxy(Properties configProps) {
+		checkNotNull(configProps);
+
+		this.kinesisClient = AWSUtil.createKinesisClient(configProps);
+
+		this.describeStreamBaseBackoffMillis = Long.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
+		this.describeStreamMaxBackoffMillis = Long.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
+		this.describeStreamExpConstant = Double.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
+
+		this.getRecordsBaseBackoffMillis = Long.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE)));
+		this.getRecordsMaxBackoffMillis = Long.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX)));
+		this.getRecordsExpConstant = Double.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.getRecordsMaxAttempts = Integer.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
+
+		this.getShardIteratorBaseBackoffMillis = Long.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE)));
+		this.getShardIteratorMaxBackoffMillis = Long.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX)));
+		this.getShardIteratorExpConstant = Double.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
+				Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
+		this.getShardIteratorMaxAttempts = Integer.valueOf(
+			configProps.getProperty(
+				ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
+				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
+
+	}
+
+	/**
+	 * Creates a Kinesis proxy.
+	 *
+	 * @param configProps configuration properties
+	 * @return the created kinesis proxy
+	 */
+	public static KinesisProxyInterface create(Properties configProps) {
+		return new KinesisProxy(configProps);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
+		final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
+		getRecordsRequest.setShardIterator(shardIterator);
+		getRecordsRequest.setLimit(maxRecordsToGet);
+
+		GetRecordsResult getRecordsResult = null;
+
+		int attempt = 0;
+		while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
+			try {
+				getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
+			} catch (ProvisionedThroughputExceededException ex) {
+				long backoffMillis = fullJitterBackoff(
+					getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
+				LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
+					+ backoffMillis + " millis.");
+				Thread.sleep(backoffMillis);
+			}
+		}
+
+		if (getRecordsResult == null) {
+			throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
+				" retry attempts returned ProvisionedThroughputExceededException.");
+		}
+
+		return getRecordsResult;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
+		GetShardListResult result = new GetShardListResult();
+
+		for (Map.Entry<String,String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) {
+			String stream = streamNameWithLastSeenShardId.getKey();
+			String lastSeenShardId = streamNameWithLastSeenShardId.getValue();
+			result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId));
+		}
+		return result;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException {
+		GetShardIteratorResult getShardIteratorResult = null;
+
+		int attempt = 0;
+		while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
+			try {
+				getShardIteratorResult =
+					kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
+			} catch (ProvisionedThroughputExceededException ex) {
+				long backoffMillis = fullJitterBackoff(
+					getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
+				LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
+					+ backoffMillis + " millis.");
+				Thread.sleep(backoffMillis);
+			}
+		}
+
+		if (getShardIteratorResult == null) {
+			throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts +
+				" retry attempts returned ProvisionedThroughputExceededException.");
+		}
+		return getShardIteratorResult.getShardIterator();
+	}
+
+	private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
+		List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
+
+		DescribeStreamResult describeStreamResult;
+		do {
+			describeStreamResult = describeStream(streamName, lastSeenShardId);
+
+			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
+			for (Shard shard : shards) {
+				shardsOfStream.add(new KinesisStreamShard(streamName, shard));
+			}
+
+			if (shards.size() != 0) {
+				lastSeenShardId = shards.get(shards.size() - 1).getShardId();
+			}
+		} while (describeStreamResult.getStreamDescription().isHasMoreShards());
+
+		return shardsOfStream;
+	}
+
+	/**
+	 * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess.
+	 *
+	 * This method is using a "full jitter" approach described in AWS's article,
+	 * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>.
+	 * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This
+	 * jitter backoff approach will help distribute calls across the fetchers over time.
+	 *
+	 * @param streamName the stream to describe
+	 * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
+	 * @return the result of the describe stream operation
+	 */
+	private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
+		final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
+		describeStreamRequest.setStreamName(streamName);
+		describeStreamRequest.setExclusiveStartShardId(startShardId);
+
+		DescribeStreamResult describeStreamResult = null;
+
+		// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
+		int attemptCount = 0;
+		while (describeStreamResult == null) { // retry until we get a result
+			try {
+				describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
+			} catch (LimitExceededException le) {
+				long backoffMillis = fullJitterBackoff(
+					describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
+				LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
+					+ backoffMillis + " millis.");
+				Thread.sleep(backoffMillis);
+			} catch (ResourceNotFoundException re) {
+				throw new RuntimeException("Error while getting stream details", re);
+			}
+		}
+
+		String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
+		if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
+			if (LOG.isWarnEnabled()) {
+				LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
+					"describeStream operation will not contain any shard information.");
+			}
+		}
+
+		// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
+		// start shard id in the returned shards list; check if we need to remove these erroneously returned shards
+		if (startShardId != null) {
+			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
+			Iterator<Shard> shardItr = shards.iterator();
+			while (shardItr.hasNext()) {
+				if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
+					shardItr.remove();
+				}
+			}
+		}
+
+		return describeStreamResult;
+	}
+
+	private static long fullJitterBackoff(long base, long max, double power, int attempt) {
+		long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt));
+		return (long)(seed.nextDouble()*exponentialBackoff); // random jitter between 0 and the exponential backoff
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
new file mode 100644
index 0000000..39ddc52
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+
+import java.util.Map;
+
+/**
+ * Interface for a Kinesis proxy that operates on multiple Kinesis streams within the same AWS service region.
+ */
+public interface KinesisProxyInterface {
+
+	/**
+	 * Get a shard iterator from the specified position in a shard.
+	 * The retrieved shard iterator can be used in {@link KinesisProxyInterface#getRecords(String, int)}}
+	 * to read data from the Kinesis shard.
+	 *
+	 * @param shard the shard to get the iterator
+	 * @param shardIteratorType the iterator type, defining how the shard is to be iterated
+	 *                          (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
+	 * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST
+	 * @return shard iterator which can be used to read data from Kinesis
+	 * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
+	 *                              operation has exceeded the rate limit; this exception will be thrown
+	 *                              if the backoff is interrupted.
+	 */
+	String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException;
+
+	/**
+	 * Get the next batch of data records using a specific shard iterator
+	 *
+	 * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading
+	 * @param maxRecordsToGet the maximum amount of records to retrieve for this batch
+	 * @return the batch of retrieved records, also with a shard iterator that can be used to get the next batch
+	 * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
+	 *                              operation has exceeded the rate limit; this exception will be thrown
+	 *                              if the backoff is interrupted.
+	 */
+	GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException;
+
+	/**
+	 * Get shard list of multiple Kinesis streams, ignoring the
+	 * shards of each stream before a specified last seen shard id.
+	 *
+	 * @param streamNamesWithLastSeenShardIds a map with stream as key, and last seen shard id as value
+	 * @return result of the shard list query
+	 * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
+	 *                              operation has exceeded the rate limit; this exception will be thrown
+	 *                              if the backoff is interrupted.
+	 */
+	GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
new file mode 100644
index 0000000..0effdd8
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * This is a deserialization schema specific for the Flink Kinesis Consumer. Different from the
+ * basic {@link DeserializationSchema}, this schema offers additional Kinesis-specific information
+ * about the record that may be useful to the user application.
+ *
+ * @param <T> The type created by the keyed deserialization schema.
+ */
+public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Deserializes a Kinesis record's bytes
+	 *
+	 * @param recordValue the record's value as a byte array
+	 * @param partitionKey the record's partition key at the time of writing
+	 * @param seqNum the sequence number of this record in the Kinesis shard
+	 * @param approxArrivalTimestamp the server-side timestamp of when Kinesis received and stored the record
+	 * @param stream the name of the Kinesis stream that this record was sent to
+	 * @param shardId The identifier of the shard the record was sent to
+	 * @return the deserialized message as an Java object
+	 * @throws IOException
+	 */
+	T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException;
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted.
+	 *
+	 * @param nextElement the element to test for the end-of-stream signal
+	 * @return true if the element signals end of stream, false otherwise
+	 */
+	// TODO FLINK-4194 ADD SUPPORT FOR boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..6e66038
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import java.io.IOException;
+
+/**
+ * A simple wrapper for using the {@link DeserializationSchema} with the {@link KinesisDeserializationSchema} interface
+ *
+ * @param <T> The type created by the deserialization schema.
+ */
+public class KinesisDeserializationSchemaWrapper<T> implements KinesisDeserializationSchema<T> {
+	private static final long serialVersionUID = 9143148962928375886L;
+
+	private final DeserializationSchema<T> deserializationSchema;
+
+	public KinesisDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
+		this.deserializationSchema = deserializationSchema;
+	}
+
+	@Override
+	public T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId)
+		throws IOException {
+		return deserializationSchema.deserialize(recordValue);
+	}
+
+	/*
+	FLINK-4194
+
+	@Override
+	public boolean isEndOfStream(T nextElement) {
+		return deserializationSchema.isEndOfStream(nextElement);
+	} */
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return deserializationSchema.getProducedType();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
new file mode 100644
index 0000000..03dd72c
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kinesis.serialization;
+
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * Kinesis-specific serialization schema, allowing users to specify a target stream based
+ * on a record's contents.
+ * @param <T>
+ */
+public interface KinesisSerializationSchema<T> extends Serializable {
+	/**
+	 * Serialize the given element into a ByteBuffer
+	 *
+	 * @param element The element to serialize
+	 * @return Serialized representation of the element
+	 */
+	ByteBuffer serialize(T element);
+
+	/**
+	 * Optional method to determine the target stream based on the element.
+	 * Return <code>null</code> to use the default stream
+	 *
+	 * @param element The element to determine the target stream from
+	 * @return target stream name
+	 */
+	String getTargetStream(T element);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
new file mode 100644
index 0000000..cff69e5
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+
+import java.util.Properties;
+
+/**
+ * Some utilities specific to Amazon Web Service.
+ */
+public class AWSUtil {
+
+	/**
+	 * Creates an Amazon Kinesis Client.
+	 * @param configProps configuration properties containing the access key, secret key, and region
+	 * @return a new Amazon Kinesis Client
+	 */
+	public static AmazonKinesisClient createKinesisClient(Properties configProps) {
+		// set a Flink-specific user agent
+		ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig();
+		awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() +
+			" (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector");
+
+		// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
+		AmazonKinesisClient client = new AmazonKinesisClient(
+			AWSUtil.getCredentialsProvider(configProps), awsClientConfig);
+
+		client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))));
+		if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
+			client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+		}
+		return client;
+	}
+
+	/**
+	 * Return a {@link AWSCredentialsProvider} instance corresponding to the configuration properties.
+	 *
+	 * @param configProps the configuration properties
+	 * @return The corresponding AWS Credentials Provider instance
+	 */
+	public static AWSCredentialsProvider getCredentialsProvider(final Properties configProps) {
+		CredentialProvider credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(
+			AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, CredentialProvider.BASIC.toString()));
+
+		AWSCredentialsProvider credentialsProvider;
+
+		switch (credentialProviderType) {
+			case ENV_VAR:
+				credentialsProvider = new EnvironmentVariableCredentialsProvider();
+				break;
+			case SYS_PROP:
+				credentialsProvider = new SystemPropertiesCredentialsProvider();
+				break;
+			case PROFILE:
+				String profileName = configProps.getProperty(
+					AWSConfigConstants.AWS_PROFILE_NAME, null);
+				String profileConfigPath = configProps.getProperty(
+					AWSConfigConstants.AWS_PROFILE_PATH, null);
+				credentialsProvider = (profileConfigPath == null)
+					? new ProfileCredentialsProvider(profileName)
+					: new ProfileCredentialsProvider(profileConfigPath, profileName);
+				break;
+			case AUTO:
+				credentialsProvider = new DefaultAWSCredentialsProviderChain();
+				break;
+			default:
+			case BASIC:
+				credentialsProvider = new AWSCredentialsProvider() {
+					@Override
+					public AWSCredentials getCredentials() {
+						return new BasicAWSCredentials(
+							configProps.getProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID),
+							configProps.getProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY));
+					}
+
+					@Override
+					public void refresh() {
+						// do nothing
+					}
+				};
+		}
+
+		return credentialsProvider;
+	}
+
+	/**
+	 * Checks whether or not a region ID is valid
+	 *
+	 * @param region The AWS region ID to check
+	 * @return true if the supplied region ID is valid, false otherwise
+	 */
+	public static boolean isValidRegion(String region) {
+		try {
+			Regions.fromName(region.toLowerCase());
+		} catch (IllegalArgumentException e) {
+			return false;
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
new file mode 100644
index 0000000..9aa14ad
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
+import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utilities for Flink Kinesis connector configuration.
+ */
+public class KinesisConfigUtil {
+
+	/**
+	 * Validate configuration properties for {@link FlinkKinesisConsumer}.
+	 */
+	public static void validateConsumerConfiguration(Properties config) {
+		checkNotNull(config, "config can not be null");
+
+		validateAwsConfiguration(config);
+
+		if (config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) {
+			String initPosType = config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION);
+
+			// specified initial position in stream must be either LATEST or TRIM_HORIZON
+			try {
+				InitialPosition.valueOf(initPosType);
+			} catch (IllegalArgumentException e) {
+				StringBuilder sb = new StringBuilder();
+				for (InitialPosition pos : InitialPosition.values()) {
+					sb.append(pos.toString()).append(", ");
+				}
+				throw new IllegalArgumentException("Invalid initial position in stream set in config. Valid values are: " + sb.toString());
+			}
+		}
+
+		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
+			"Invalid value given for maximum records per getRecords shard operation. Must be a valid non-negative integer value.");
+
+		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
+			"Invalid value given for maximum retry attempts for getRecords shard operation. Must be a valid non-negative integer value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
+			"Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
+			"Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
+			"Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
+			"Invalid value given for getRecords sleep interval in milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
+			"Invalid value given for maximum retry attempts for getShardIterator shard operation. Must be a valid non-negative integer value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
+			"Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
+			"Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
+			"Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
+			"Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
+			"Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
+			"Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
+			"Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value.");
+
+		if (config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) {
+			checkArgument(
+				Long.parseLong(config.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS))
+					< ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS,
+				"Invalid value given for getRecords sleep interval in milliseconds. Must be lower than " +
+					ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS + " milliseconds."
+			);
+		}
+	}
+
+	/**
+	 * Validate configuration properties for {@link FlinkKinesisProducer}.
+	 */
+	public static void validateProducerConfiguration(Properties config) {
+		checkNotNull(config, "config can not be null");
+
+		validateAwsConfiguration(config);
+
+		validateOptionalPositiveLongProperty(config, ProducerConfigConstants.COLLECTION_MAX_COUNT,
+			"Invalid value given for maximum number of items to pack into a PutRecords request. Must be a valid non-negative long value.");
+
+		validateOptionalPositiveLongProperty(config, ProducerConfigConstants.AGGREGATION_MAX_COUNT,
+			"Invalid value given for maximum number of items to pack into an aggregated record. Must be a valid non-negative long value.");
+	}
+
+	/**
+	 * Validate configuration properties related to Amazon AWS service
+	 */
+	public static void validateAwsConfiguration(Properties config) {
+		if (!config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
+			// if the credential provider type is not specified, it will default to BASIC later on,
+			// so the Access Key ID and Secret Key must be given
+			if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
+				|| !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+				throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
+					"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
+			}
+		} else {
+			String credentialsProviderType = config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+
+			// value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be recognizable
+			CredentialProvider providerType;
+			try {
+				providerType = CredentialProvider.valueOf(credentialsProviderType);
+			} catch (IllegalArgumentException e) {
+				StringBuilder sb = new StringBuilder();
+				for (CredentialProvider type : CredentialProvider.values()) {
+					sb.append(type.toString()).append(", ");
+				}
+				throw new IllegalArgumentException("Invalid AWS Credential Provider Type set in config. Valid values are: " + sb.toString());
+			}
+
+			// if BASIC type is used, also check that the Access Key ID and Secret Key is supplied
+			if (providerType == CredentialProvider.BASIC) {
+				if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
+					|| !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+					throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
+						"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
+				}
+			}
+		}
+
+		if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
+			throw new IllegalArgumentException("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config.");
+		} else {
+			// specified AWS Region name must be recognizable
+			if (!AWSUtil.isValidRegion(config.getProperty(AWSConfigConstants.AWS_REGION))) {
+				StringBuilder sb = new StringBuilder();
+				for (Regions region : Regions.values()) {
+					sb.append(region.getName()).append(", ");
+				}
+				throw new IllegalArgumentException("Invalid AWS region set in config. Valid values are: " + sb.toString());
+			}
+		}
+	}
+
+	private static void validateOptionalPositiveLongProperty(Properties config, String key, String message) {
+		if (config.containsKey(key)) {
+			try {
+				long value = Long.parseLong(config.getProperty(key));
+				if (value < 0) {
+					throw new NumberFormatException();
+				}
+			} catch (NumberFormatException e) {
+				throw new IllegalArgumentException(message);
+			}
+		}
+	}
+
+	private static void validateOptionalPositiveIntProperty(Properties config, String key, String message) {
+		if (config.containsKey(key)) {
+			try {
+				int value = Integer.parseInt(config.getProperty(key));
+				if (value < 0) {
+					throw new NumberFormatException();
+				}
+			} catch (NumberFormatException e) {
+				throw new IllegalArgumentException(message);
+			}
+		}
+	}
+
+	private static void validateOptionalPositiveDoubleProperty(Properties config, String key, String message) {
+		if (config.containsKey(key)) {
+			try {
+				double value = Double.parseDouble(config.getProperty(key));
+				if (value < 0) {
+					throw new NumberFormatException();
+				}
+			} catch (NumberFormatException e) {
+				throw new IllegalArgumentException(message);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties
new file mode 100644
index 0000000..773f932
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger