You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:34:52 UTC

[02/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
deleted file mode 100644
index a06fdca..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ /dev/null
@@ -1,679 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.internals;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
-import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import org.apache.flink.util.InstantiationUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates
- * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following:
- * <ul>
- *     <li>1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset
- *     		  of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be
- *     		  subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe
- *     		  to the same subset of shards even after restoring)</li>
- *     <li>2. decide where in each discovered shard should the fetcher start subscribing to</li>
- *     <li>3. subscribe to shards by creating a single thread for each shard</li>
- * </ul>
- *
- * <p>The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery),
- * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed
- * by multiple threads, these operations should only be done using the handler methods provided in this class.
- */
-public class KinesisDataFetcher<T> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class);
-
-	// ------------------------------------------------------------------------
-	//  Consumer-wide settings
-	// ------------------------------------------------------------------------
-
-	/** Configuration properties for the Flink Kinesis Consumer */
-	private final Properties configProps;
-
-	/** The list of Kinesis streams that the consumer is subscribing to */
-	private final List<String> streams;
-
-	/**
-	 * The deserialization schema we will be using to convert Kinesis records to Flink objects.
-	 * Note that since this might not be thread-safe, {@link ShardConsumer}s using this must
-	 * clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}.
-	 */
-	private final KinesisDeserializationSchema<T> deserializationSchema;
-
-	// ------------------------------------------------------------------------
-	//  Subtask-specific settings
-	// ------------------------------------------------------------------------
-
-	/** Runtime context of the subtask that this fetcher was created in */
-	private final RuntimeContext runtimeContext;
-
-	private final int totalNumberOfConsumerSubtasks;
-
-	private final int indexOfThisConsumerSubtask;
-
-	/**
-	 * This flag should be set by {@link FlinkKinesisConsumer} using
-	 * {@link KinesisDataFetcher#setIsRestoringFromFailure(boolean)}
-	 */
-	private boolean isRestoredFromFailure;
-
-	// ------------------------------------------------------------------------
-	//  Executor services to run created threads
-	// ------------------------------------------------------------------------
-
-	/** Executor service to run {@link ShardConsumer}s to consume Kinesis shards */
-	private final ExecutorService shardConsumersExecutor;
-
-	// ------------------------------------------------------------------------
-	//  Managed state, accessed and updated across multiple threads
-	// ------------------------------------------------------------------------
-
-	/** The last discovered shard ids of each subscribed stream, updated as the fetcher discovers new shards in.
-	 * Note: this state will be updated if new shards are found when {@link KinesisDataFetcher#discoverNewShardsToSubscribe()} is called.
-	 */
-	private final Map<String, String> subscribedStreamsToLastDiscoveredShardIds;
-
-	/**
-	 * The shards, along with their last processed sequence numbers, that this fetcher is subscribed to. The fetcher
-	 * will add new subscribed shard states to this list as it discovers new shards. {@link ShardConsumer} threads update
-	 * the last processed sequence number of subscribed shards as they fetch and process records.
-	 *
-	 * <p>Note that since multiple {@link ShardConsumer} threads will be performing operations on this list, all operations
-	 * must be wrapped in synchronized blocks on the {@link KinesisDataFetcher#checkpointLock} lock. For this purpose,
-	 * all threads must use the following thread-safe methods this class provides to operate on this list:
-	 * <ul>
-	 *     <li>{@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}</li>
-	 *     <li>{@link KinesisDataFetcher#updateState(int, SequenceNumber)}</li>
-	 *     <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(T, long, int, SequenceNumber)}</li>
-	 * </ul>
-	 */
-	private final List<KinesisStreamShardState> subscribedShardsState;
-
-	private final SourceFunction.SourceContext<T> sourceContext;
-
-	/** Checkpoint lock, also used to synchronize operations on subscribedShardsState */
-	private final Object checkpointLock;
-
-	/** Reference to the first error thrown by any of the {@link ShardConsumer} threads */
-	private final AtomicReference<Throwable> error;
-
-	/** The Kinesis proxy that the fetcher will be using to discover new shards */
-	private final KinesisProxyInterface kinesis;
-
-	/** Thread that executed runFetcher() */
-	private Thread mainThread;
-
-	/**
-	 * The current number of shards that are actively read by this fetcher.
-	 *
-	 * This value is updated in {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
-	 * and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}.
-	 */
-	private final AtomicInteger numberOfActiveShards = new AtomicInteger(0);
-
-	private volatile boolean running = true;
-
-	/**
-	 * Creates a Kinesis Data Fetcher.
-	 *
-	 * @param streams the streams to subscribe to
-	 * @param sourceContext context of the source function
-	 * @param runtimeContext this subtask's runtime context
-	 * @param configProps the consumer configuration properties
-	 * @param deserializationSchema deserialization schema
-	 */
-	public KinesisDataFetcher(List<String> streams,
-							SourceFunction.SourceContext<T> sourceContext,
-							RuntimeContext runtimeContext,
-							Properties configProps,
-							KinesisDeserializationSchema<T> deserializationSchema) {
-		this(streams,
-			sourceContext,
-			sourceContext.getCheckpointLock(),
-			runtimeContext,
-			configProps,
-			deserializationSchema,
-			new AtomicReference<Throwable>(),
-			new LinkedList<KinesisStreamShardState>(),
-			createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
-			KinesisProxy.create(configProps));
-	}
-
-	/** This constructor is exposed for testing purposes */
-	protected KinesisDataFetcher(List<String> streams,
-								SourceFunction.SourceContext<T> sourceContext,
-								Object checkpointLock,
-								RuntimeContext runtimeContext,
-								Properties configProps,
-								KinesisDeserializationSchema<T> deserializationSchema,
-								AtomicReference<Throwable> error,
-								LinkedList<KinesisStreamShardState> subscribedShardsState,
-								HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
-								KinesisProxyInterface kinesis) {
-		this.streams = checkNotNull(streams);
-		this.configProps = checkNotNull(configProps);
-		this.sourceContext = checkNotNull(sourceContext);
-		this.checkpointLock = checkNotNull(checkpointLock);
-		this.runtimeContext = checkNotNull(runtimeContext);
-		this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks();
-		this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
-		this.deserializationSchema = checkNotNull(deserializationSchema);
-		this.kinesis = checkNotNull(kinesis);
-
-		this.error = checkNotNull(error);
-		this.subscribedShardsState = checkNotNull(subscribedShardsState);
-		this.subscribedStreamsToLastDiscoveredShardIds = checkNotNull(subscribedStreamsToLastDiscoveredShardIds);
-
-		this.shardConsumersExecutor =
-			createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
-	}
-
-	/**
-	 * Starts the fetcher. After starting the fetcher, it can only
-	 * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
-	 *
-	 * @throws Exception the first error or exception thrown by the fetcher or any of the threads created by the fetcher.
-	 */
-	public void runFetcher() throws Exception {
-
-		// check that we are running before proceeding
-		if (!running) {
-			return;
-		}
-
-		this.mainThread = Thread.currentThread();
-
-		// ------------------------------------------------------------------------
-		//  Procedures before starting the infinite while loop:
-		// ------------------------------------------------------------------------
-
-		//  1. query for any new shards that may have been created while the Kinesis consumer was not running,
-		//     and register them to the subscribedShardState list.
-		if (LOG.isDebugEnabled()) {
-			String logFormat = (!isRestoredFromFailure)
-				? "Subtask {} is trying to discover initial shards ..."
-				: "Subtask {} is trying to discover any new shards that were created while the consumer wasn't " +
-				"running due to failure ...";
-
-			LOG.debug(logFormat, indexOfThisConsumerSubtask);
-		}
-		List<KinesisStreamShard> newShardsCreatedWhileNotRunning = discoverNewShardsToSubscribe();
-		for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) {
-			// the starting state for new shards created while the consumer wasn't running depends on whether or not
-			// we are starting fresh (not restoring from a checkpoint); when we are starting fresh, this simply means
-			// all existing shards of streams we are subscribing to are new shards; when we are restoring from checkpoint,
-			// any new shards due to Kinesis resharding from the time of the checkpoint will be considered new shards.
-			InitialPosition initialPosition = InitialPosition.valueOf(configProps.getProperty(
-				ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION));
-
-			SentinelSequenceNumber startingStateForNewShard = (isRestoredFromFailure)
-				? SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM
-				: initialPosition.toSentinelSequenceNumber();
-
-			if (LOG.isInfoEnabled()) {
-				String logFormat = (!isRestoredFromFailure)
-					? "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}"
-					: "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't " +
-					"running due to failure, starting state set as sequence number {}";
-
-				LOG.info(logFormat, indexOfThisConsumerSubtask, shard.toString(), startingStateForNewShard.get());
-			}
-			registerNewSubscribedShardState(new KinesisStreamShardState(shard, startingStateForNewShard.get()));
-		}
-
-		//  2. check that there is at least one shard in the subscribed streams to consume from (can be done by
-		//     checking if at least one value in subscribedStreamsToLastDiscoveredShardIds is not null)
-		boolean hasShards = false;
-		StringBuilder streamsWithNoShardsFound = new StringBuilder();
-		for (Map.Entry<String, String> streamToLastDiscoveredShardEntry : subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
-			if (streamToLastDiscoveredShardEntry.getValue() != null) {
-				hasShards = true;
-			} else {
-				streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(", ");
-			}
-		}
-
-		if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) {
-			LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams: {}",
-				indexOfThisConsumerSubtask, streamsWithNoShardsFound.toString());
-		}
-
-		if (!hasShards) {
-			throw new RuntimeException("No shards can be found for all subscribed streams: " + streams);
-		}
-
-		//  3. start consuming any shard state we already have in the subscribedShardState up to this point; the
-		//     subscribedShardState may already be seeded with values due to step 1., or explicitly added by the
-		//     consumer using a restored state checkpoint
-		for (int seededStateIndex = 0; seededStateIndex < subscribedShardsState.size(); seededStateIndex++) {
-			KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex);
-
-			// only start a consuming thread if the seeded subscribed shard has not been completely read already
-			if (!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
-
-				if (LOG.isInfoEnabled()) {
-					LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}",
-						indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(),
-						seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
-					}
-
-				shardConsumersExecutor.submit(
-					new ShardConsumer<>(
-						this,
-						seededStateIndex,
-						subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
-						subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
-			}
-		}
-
-		// ------------------------------------------------------------------------
-
-		// finally, start the infinite shard discovery and consumer launching loop;
-		// we will escape from this loop only when shutdownFetcher() or stopWithError() is called
-
-		final long discoveryIntervalMillis = Long.valueOf(
-			configProps.getProperty(
-				ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
-				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
-
-		// FLINK-4341:
-		// For downstream operators that work on time (ex. window operators), we are required to emit a max value watermark
-		// for subtasks that won't continue to have shards to read from unless resharding happens in the future, otherwise
-		// the downstream watermarks would not advance, leading to unbounded accumulating state.
-		//
-		// The side-effect of this limitation is that on resharding, we must fail hard if the newly discovered shard
-		// is to be subscribed by a subtask that has previously emitted a max value watermark, otherwise the watermarks
-		// will be messed up.
-		//
-		// There are 2 cases were we need to either emit a max value watermark, or deliberately fail hard:
-		//  (a) if this subtask has no more shards to read from unless resharding happens in the future, we emit a max
-		//      value watermark. This case is encountered when 1) all previously read shards by this subtask were closed
-		//      due to resharding, 2) when this subtask was initially only subscribed to closed shards while the consumer
-		//      was told to start from TRIM_HORIZON, or 3) there was initially no shards for this subtask to read on startup.
-		//  (b) this subtask has discovered new shards to read from due to a reshard; if this subtask has already emitted
-		//      a max value watermark, we must deliberately fail hard to avoid messing up the watermarks. The new shards
-		//      will be subscribed by this subtask after restore as initial shards on startup.
-		//
-		// TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager
-		// Please see FLINK-4341 for more detail
-
-		boolean emittedMaxValueWatermark = false;
-
-		if (this.numberOfActiveShards.get() == 0) {
-			// FLINK-4341 workaround case (a) - please see the above for details on this case
-			LOG.info("Subtask {} has no initial shards to read on startup; emitting max value watermark ...",
-				indexOfThisConsumerSubtask);
-			sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
-			emittedMaxValueWatermark = true;
-		}
-
-		while (running) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...",
-					indexOfThisConsumerSubtask);
-			}
-			List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe();
-
-			// -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards --
-			// Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(), there exists
-			// a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards
-			// may not correctly reflect the discover result in the below case determination. This may lead to incorrect
-			// case determination on the current discovery attempt, but can still be correctly handled on future attempts.
-			//
-			// Although this can be resolved by wrapping the current shard discovery attempt with the below
-			// case determination within a synchronized block on the checkpoint lock for atomicity, there will be
-			// considerable throughput performance regression as shard discovery is a remote call to AWS. Therefore,
-			// since the case determination is a temporary workaround for FLINK-4341, the race condition is tolerable as
-			// we can still eventually handle max value watermark emitting / deliberately failing on successive
-			// discovery attempts.
-
-			if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) {
-				// FLINK-4341 workaround case (a) - please see the above for details on this case
-				LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...",
-					indexOfThisConsumerSubtask);
-				sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
-				emittedMaxValueWatermark = true;
-			} else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) {
-				// FLINK-4341 workaround case (b) - please see the above for details on this case
-				//
-				// Note that in the case where on resharding this subtask ceased to read all of it's previous shards
-				// but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark
-				// will be false; this allows the fetcher to continue reading the new shards without failing on such cases.
-				// However, due to the race condition mentioned above, we might still fall into case (a) first, and
-				// then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value
-				// watermark emitting still remains to be correct.
-
-				LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" +
-						" up watermarks; the new shards will be subscribed by this subtask after restore ...",
-					indexOfThisConsumerSubtask, newShardsDueToResharding.size());
-				throw new RuntimeException("Deliberate failure to avoid messing up watermarks");
-			}
-
-			for (KinesisStreamShard shard : newShardsDueToResharding) {
-				// since there may be delay in discovering a new shard, all new shards due to
-				// resharding should be read starting from the earliest record possible
-				KinesisStreamShardState newShardState =
-					new KinesisStreamShardState(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
-				int newStateIndex = registerNewSubscribedShardState(newShardState);
-
-				if (LOG.isInfoEnabled()) {
-					LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming " +
-							"the shard from sequence number {} with ShardConsumer {}",
-						indexOfThisConsumerSubtask, newShardState.getKinesisStreamShard().toString(),
-						newShardState.getLastProcessedSequenceNum(), newStateIndex);
-				}
-
-				shardConsumersExecutor.submit(
-					new ShardConsumer<>(
-						this,
-						newStateIndex,
-						newShardState.getKinesisStreamShard(),
-						newShardState.getLastProcessedSequenceNum()));
-			}
-
-			// we also check if we are running here so that we won't start the discovery sleep
-			// interval if the running flag was set to false during the middle of the while loop
-			if (running && discoveryIntervalMillis != 0) {
-				try {
-					Thread.sleep(discoveryIntervalMillis);
-				} catch (InterruptedException iex) {
-					// the sleep may be interrupted by shutdownFetcher()
-				}
-			}
-		}
-
-		// make sure all resources have been terminated before leaving
-		awaitTermination();
-
-		// any error thrown in the shard consumer threads will be thrown to the main thread
-		Throwable throwable = this.error.get();
-		if (throwable != null) {
-			if (throwable instanceof Exception) {
-				throw (Exception) throwable;
-			} else if (throwable instanceof Error) {
-				throw (Error) throwable;
-			} else {
-				throw new Exception(throwable);
-			}
-		}
-	}
-
-	/**
-	 * Creates a snapshot of the current last processed sequence numbers of each subscribed shard.
-	 *
-	 * @return state snapshot
-	 */
-	public HashMap<KinesisStreamShard, SequenceNumber> snapshotState() {
-		// this method assumes that the checkpoint lock is held
-		assert Thread.holdsLock(checkpointLock);
-
-		HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new HashMap<>();
-		for (KinesisStreamShardState shardWithState : subscribedShardsState) {
-			stateSnapshot.put(shardWithState.getKinesisStreamShard(), shardWithState.getLastProcessedSequenceNum());
-		}
-		return stateSnapshot;
-	}
-
-	/**
-	 * Starts shutting down the fetcher. Must be called to allow {@link KinesisDataFetcher#runFetcher()} to complete.
-	 * Once called, the shutdown procedure will be executed and all shard consuming threads will be interrupted.
-	 */
-	public void shutdownFetcher() {
-		running = false;
-		mainThread.interrupt(); // the main thread may be sleeping for the discovery interval
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
-		}
-		shardConsumersExecutor.shutdownNow();
-	}
-
-	/** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown */
-	public void awaitTermination() throws InterruptedException {
-		while(!shardConsumersExecutor.isTerminated()) {
-			Thread.sleep(50);
-		}
-	}
-
-	/** Called by created threads to pass on errors. Only the first thrown error is set.
-	 * Once set, the shutdown process will be executed and all shard consuming threads will be interrupted. */
-	protected void stopWithError(Throwable throwable) {
-		if (this.error.compareAndSet(null, throwable)) {
-			shutdownFetcher();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Functions that update the subscribedStreamToLastDiscoveredShardIds state
-	// ------------------------------------------------------------------------
-
-	/** Updates the last discovered shard of a subscribed stream; only updates if the update is valid */
-	public void advanceLastDiscoveredShardOfStream(String stream, String shardId) {
-		String lastSeenShardIdOfStream = this.subscribedStreamsToLastDiscoveredShardIds.get(stream);
-
-		// the update is valid only if the given shard id is greater
-		// than the previous last seen shard id of the stream
-		if (lastSeenShardIdOfStream == null) {
-			// if not previously set, simply put as the last seen shard id
-			this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
-		} else if (KinesisStreamShard.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) {
-			this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
-		}
-	}
-
-	/**
-	 * A utility function that does the following:
-	 *
-	 * 1. Find new shards for each stream that we haven't seen before
-	 * 2. For each new shard, determine whether this consumer subtask should subscribe to them;
-	 * 	  if yes, it is added to the returned list of shards
-	 * 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards
-	 *    that we have already seen before the next time this function is called
-	 */
-	private List<KinesisStreamShard> discoverNewShardsToSubscribe() throws InterruptedException {
-
-		List<KinesisStreamShard> newShardsToSubscribe = new LinkedList<>();
-
-		GetShardListResult shardListResult = kinesis.getShardList(subscribedStreamsToLastDiscoveredShardIds);
-		if (shardListResult.hasRetrievedShards()) {
-			Set<String> streamsWithNewShards = shardListResult.getStreamsWithRetrievedShards();
-
-			for (String stream : streamsWithNewShards) {
-				List<KinesisStreamShard> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream);
-				for (KinesisStreamShard newShard : newShardsOfStream) {
-					if (isThisSubtaskShouldSubscribeTo(newShard, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) {
-						newShardsToSubscribe.add(newShard);
-					}
-				}
-
-				advanceLastDiscoveredShardOfStream(
-					stream, shardListResult.getLastSeenShardOfStream(stream).getShard().getShardId());
-			}
-		}
-
-		return newShardsToSubscribe;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Functions to get / set information about the consumer
-	// ------------------------------------------------------------------------
-
-	public void setIsRestoringFromFailure(boolean bool) {
-		this.isRestoredFromFailure = bool;
-	}
-
-	protected Properties getConsumerConfiguration() {
-		return configProps;
-	}
-
-	protected KinesisDeserializationSchema<T> getClonedDeserializationSchema() {
-		try {
-			return InstantiationUtil.clone(deserializationSchema, runtimeContext.getUserCodeClassLoader());
-		} catch (IOException | ClassNotFoundException ex) {
-			// this really shouldn't happen; simply wrap it around a runtime exception
-			throw new RuntimeException(ex);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Thread-safe operations for record emitting and shard state updating
-	//  that assure atomicity with respect to the checkpoint lock
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Atomic operation to collect a record and update state to the sequence number of the record.
-	 * This method is called by {@link ShardConsumer}s.
-	 *
-	 * @param record the record to collect
-	 * @param recordTimestamp timestamp to attach to the collected record
-	 * @param shardStateIndex index of the shard to update in subscribedShardsState;
-	 *                        this index should be the returned value from
-	 *                        {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
-	 *                        when the shard state was registered.
-	 * @param lastSequenceNumber the last sequence number value to update
-	 */
-	protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
-		synchronized (checkpointLock) {
-			sourceContext.collectWithTimestamp(record, recordTimestamp);
-			updateState(shardStateIndex, lastSequenceNumber);
-		}
-	}
-
-	/**
-	 * Update the shard to last processed sequence number state.
-	 * This method is called by {@link ShardConsumer}s.
-	 *
-	 * @param shardStateIndex index of the shard to update in subscribedShardsState;
-	 *                        this index should be the returned value from
-	 *                        {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
-	 *                        when the shard state was registered.
-	 * @param lastSequenceNumber the last sequence number value to update
-	 */
-	protected void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
-		synchronized (checkpointLock) {
-			subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);
-
-			// if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
-			// we've finished reading the shard and should determine it to be non-active
-			if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
-				this.numberOfActiveShards.decrementAndGet();
-				LOG.info("Subtask {} has reached the end of subscribed shard: {}",
-					indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
-			}
-		}
-	}
-
-	/**
-	 * Register a new subscribed shard state.
-	 *
-	 * @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to
-	 */
-	public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) {
-		synchronized (checkpointLock) {
-			subscribedShardsState.add(newSubscribedShardState);
-
-			// If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
-			// if the consumer had already finished reading a shard before we failed and restored), we determine that
-			// this subtask has a new active shard
-			if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
-				this.numberOfActiveShards.incrementAndGet();
-			}
-
-			return subscribedShardsState.size()-1;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Miscellaneous utility functions
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Utility function to determine whether a shard should be subscribed by this consumer subtask.
-	 *
-	 * @param shard the shard to determine
-	 * @param totalNumberOfConsumerSubtasks total number of consumer subtasks
-	 * @param indexOfThisConsumerSubtask index of this consumer subtask
-	 */
-	private static boolean isThisSubtaskShouldSubscribeTo(KinesisStreamShard shard,
-														int totalNumberOfConsumerSubtasks,
-														int indexOfThisConsumerSubtask) {
-		return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask;
-	}
-
-	private static ExecutorService createShardConsumersThreadPool(final String subtaskName) {
-		return Executors.newCachedThreadPool(new ThreadFactory() {
-			@Override
-			public Thread newThread(Runnable runnable) {
-				final AtomicLong threadCount = new AtomicLong(0);
-				Thread thread = new Thread(runnable);
-				thread.setName("shardConsumers-" + subtaskName + "-thread-" + threadCount.getAndIncrement());
-				thread.setDaemon(true);
-				return thread;
-			}
-		});
-	}
-
-	/**
-	 * Utility function to create an initial map of the last discovered shard id of each subscribed stream, set to null;
-	 * This is called in the constructor; correct values will be set later on by calling advanceLastDiscoveredShardOfStream()
-	 *
-	 * @param streams the list of subscribed streams
-	 * @return the initial map for subscribedStreamsToLastDiscoveredShardIds
-	 */
-	protected static HashMap<String, String> createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams) {
-		HashMap<String, String> initial = new HashMap<>();
-		for (String stream : streams) {
-			initial.put(stream, null);
-		}
-		return initial;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
deleted file mode 100644
index 612a4a7..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.internals;
-
-import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
-import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Thread that does the actual data pulling from AWS Kinesis shards. Each thread is in charge of one Kinesis shard only.
- */
-public class ShardConsumer<T> implements Runnable {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
-
-	private final KinesisDeserializationSchema<T> deserializer;
-
-	private final KinesisProxyInterface kinesis;
-
-	private final int subscribedShardStateIndex;
-
-	private final KinesisDataFetcher<T> fetcherRef;
-
-	private final KinesisStreamShard subscribedShard;
-
-	private final int maxNumberOfRecordsPerFetch;
-	private final long fetchIntervalMillis;
-
-	private SequenceNumber lastSequenceNum;
-
-	/**
-	 * Creates a shard consumer.
-	 *
-	 * @param fetcherRef reference to the owning fetcher
-	 * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
-	 * @param subscribedShard the shard this consumer is subscribed to
-	 * @param lastSequenceNum the sequence number in the shard to start consuming
-	 */
-	public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
-						Integer subscribedShardStateIndex,
-						KinesisStreamShard subscribedShard,
-						SequenceNumber lastSequenceNum) {
-		this(fetcherRef,
-			subscribedShardStateIndex,
-			subscribedShard,
-			lastSequenceNum,
-			KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
-	}
-
-	/** This constructor is exposed for testing purposes */
-	protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
-							Integer subscribedShardStateIndex,
-							KinesisStreamShard subscribedShard,
-							SequenceNumber lastSequenceNum,
-							KinesisProxyInterface kinesis) {
-		this.fetcherRef = checkNotNull(fetcherRef);
-		this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex);
-		this.subscribedShard = checkNotNull(subscribedShard);
-		this.lastSequenceNum = checkNotNull(lastSequenceNum);
-		checkArgument(
-			!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
-			"Should not start a ShardConsumer if the shard has already been completely read.");
-
-		this.deserializer = fetcherRef.getClonedDeserializationSchema();
-
-		Properties consumerConfig = fetcherRef.getConsumerConfiguration();
-		this.kinesis = kinesis;
-		this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfig.getProperty(
-			ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
-			Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
-		this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
-			ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
-			Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void run() {
-		String nextShardItr;
-
-		try {
-			// before infinitely looping, we set the initial nextShardItr appropriately
-
-			if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
-				// if the shard is already closed, there will be no latest next record to get for this shard
-				if (subscribedShard.isClosed()) {
-					nextShardItr = null;
-				} else {
-					nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
-				}
-			} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
-				nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
-			} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
-				nextShardItr = null;
-			} else {
-				// we will be starting from an actual sequence number (due to restore from failure).
-				// if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records
-				// from the last aggregated record; otherwise, we can simply start iterating from the record right after.
-
-				if (lastSequenceNum.isAggregated()) {
-					String itrForLastAggregatedRecord =
-						kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
-
-					// get only the last aggregated record
-					GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1);
-
-					List<UserRecord> fetchedRecords = deaggregateRecords(
-						getRecordsResult.getRecords(),
-						subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-					long lastSubSequenceNum = lastSequenceNum.getSubSequenceNumber();
-					for (UserRecord record : fetchedRecords) {
-						// we have found a dangling sub-record if it has a larger subsequence number
-						// than our last sequence number; if so, collect the record and update state
-						if (record.getSubSequenceNumber() > lastSubSequenceNum) {
-							deserializeRecordForCollectionAndUpdateState(record);
-						}
-					}
-
-					// set the nextShardItr so we can continue iterating in the next while loop
-					nextShardItr = getRecordsResult.getNextShardIterator();
-				} else {
-					// the last record was non-aggregated, so we can simply start from the next record
-					nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
-				}
-			}
-
-			while(isRunning()) {
-				if (nextShardItr == null) {
-					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
-
-					// we can close this consumer thread once we've reached the end of the subscribed shard
-					break;
-				} else {
-					if (fetchIntervalMillis != 0) {
-						Thread.sleep(fetchIntervalMillis);
-					}
-
-					GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
-
-					// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
-					List<UserRecord> fetchedRecords = deaggregateRecords(
-						getRecordsResult.getRecords(),
-						subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-						subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
-					for (UserRecord record : fetchedRecords) {
-						deserializeRecordForCollectionAndUpdateState(record);
-					}
-
-					nextShardItr = getRecordsResult.getNextShardIterator();
-				}
-			}
-		} catch (Throwable t) {
-			fetcherRef.stopWithError(t);
-		}
-	}
-
-	/**
-	 * The loop in run() checks this before fetching next batch of records. Since this runnable will be executed
-	 * by the ExecutorService {@link KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread
-	 * would be by calling shutdownNow() on {@link KinesisDataFetcher#shardConsumersExecutor} and let the executor service
-	 * interrupt all currently running {@link ShardConsumer}s.
-	 */
-	private boolean isRunning() {
-		return !Thread.interrupted();
-	}
-
-	/**
-	 * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last
-	 * successfully collected sequence number in this shard consumer is also updated so that
-	 * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard
-	 * iterators if necessary.
-	 *
-	 * Note that the server-side Kinesis timestamp is attached to the record when collected. When the
-	 * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default.
-	 *
-	 * @param record record to deserialize and collect
-	 * @throws IOException
-	 */
-	private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
-		throws IOException {
-		ByteBuffer recordData = record.getData();
-
-		byte[] dataBytes = new byte[recordData.remaining()];
-		recordData.get(dataBytes);
-
-		final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime();
-
-		final T value = deserializer.deserialize(
-			dataBytes,
-			record.getPartitionKey(),
-			record.getSequenceNumber(),
-			approxArrivalTimestamp,
-			subscribedShard.getStreamName(),
-			subscribedShard.getShard().getShardId());
-
-		SequenceNumber collectedSequenceNumber = (record.isAggregated())
-			? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())
-			: new SequenceNumber(record.getSequenceNumber());
-
-		fetcherRef.emitRecordAndUpdateState(
-			value,
-			approxArrivalTimestamp,
-			subscribedShardStateIndex,
-			collectedSequenceNumber);
-
-		lastSequenceNum = collectedSequenceNumber;
-	}
-
-	/**
-	 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
-	 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
-	 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
-	 * be used for the next call to this method.
-	 *
-	 * Note: it is important that this method is not called again before all the records from the last result have been
-	 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
-	 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
-	 * incorrect shard iteration if the iterator had to be refreshed.
-	 *
-	 * @param shardItr shard iterator to use
-	 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
-	 * @return get records result
-	 * @throws InterruptedException
-	 */
-	private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
-		GetRecordsResult getRecordsResult = null;
-		while (getRecordsResult == null) {
-			try {
-				getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
-			} catch (ExpiredIteratorException eiEx) {
-				LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
-					" refreshing the iterator ...", shardItr, subscribedShard);
-				shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
-
-				// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
-				if (fetchIntervalMillis != 0) {
-					Thread.sleep(fetchIntervalMillis);
-				}
-			}
-		}
-		return getRecordsResult;
-	}
-
-	@SuppressWarnings("unchecked")
-	protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
-		return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
deleted file mode 100644
index 53ed11b..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-import com.amazonaws.services.kinesis.model.Shard;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
- * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to
- * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges.
- */
-public class KinesisStreamShard implements Serializable {
-
-	private static final long serialVersionUID = -6004217801761077536L;
-
-	private final String streamName;
-	private final Shard shard;
-
-	private final int cachedHash;
-
-	/**
-	 * Create a new KinesisStreamShard
-	 *
-	 * @param streamName
-	 *           the name of the Kinesis stream that this shard belongs to
-	 * @param shard
-	 *           the actual AWS Shard instance that will be wrapped within this KinesisStreamShard
-	 */
-	public KinesisStreamShard(String streamName, Shard shard) {
-		this.streamName = checkNotNull(streamName);
-		this.shard = checkNotNull(shard);
-
-		// since our description of Kinesis Streams shards can be fully defined with the stream name and shard id,
-		// our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation
-		int hash = 17;
-		hash = 37 * hash + streamName.hashCode();
-		hash = 37 * hash + shard.getShardId().hashCode();
-		this.cachedHash = hash;
-	}
-
-	public String getStreamName() {
-		return streamName;
-	}
-
-	public boolean isClosed() {
-		return (shard.getSequenceNumberRange().getEndingSequenceNumber() != null);
-	}
-
-	public Shard getShard() {
-		return shard;
-	}
-
-	@Override
-	public String toString() {
-		return "KinesisStreamShard{" +
-			"streamName='" + streamName + "'" +
-			", shard='" + shard.toString() + "'}";
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (!(obj instanceof KinesisStreamShard)) {
-			return false;
-		}
-
-		if (obj == this) {
-			return true;
-		}
-
-		KinesisStreamShard other = (KinesisStreamShard) obj;
-
-		return streamName.equals(other.getStreamName()) && shard.equals(other.getShard());
-	}
-
-	@Override
-	public int hashCode() {
-		return cachedHash;
-	}
-
-	/**
-	 * Utility function to compare two shard ids
-	 *
-	 * @param firstShardId first shard id to compare
-	 * @param secondShardId second shard id to compare
-	 * @return a value less than 0 if the first shard id is smaller than the second shard id,
-	 *         or a value larger than 0 the first shard is larger then the second shard id,
-	 *         or 0 if they are equal
-	 */
-	public static int compareShardIds(String firstShardId, String secondShardId) {
-		if (!isValidShardId(firstShardId)) {
-			throw new IllegalArgumentException("The first shard id has invalid format.");
-		}
-
-		if (!isValidShardId(secondShardId)) {
-			throw new IllegalArgumentException("The second shard id has invalid format.");
-		}
-
-		// digit segment of the shard id starts at index 8
-		return Long.compare(Long.parseLong(firstShardId.substring(8)), Long.parseLong(secondShardId.substring(8)));
-	}
-
-	/**
-	 * Checks if a shard id has valid format.
-	 * Kinesis stream shard ids have 12-digit numbers left-padded with 0's,
-	 * prefixed with "shardId-", ex. "shardId-000000000015".
-	 *
-	 * @param shardId the shard id to check
-	 * @return whether the shard id is valid
-	 */
-	public static boolean isValidShardId(String shardId) {
-		if (shardId == null) { return false; }
-		return shardId.matches("^shardId-\\d{12}");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
deleted file mode 100644
index 00181da..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-/**
- * A wrapper class that bundles a {@link KinesisStreamShard} with its last processed sequence number.
- */
-public class KinesisStreamShardState {
-
-	private KinesisStreamShard kinesisStreamShard;
-	private SequenceNumber lastProcessedSequenceNum;
-
-	public KinesisStreamShardState(KinesisStreamShard kinesisStreamShard, SequenceNumber lastProcessedSequenceNum) {
-		this.kinesisStreamShard = kinesisStreamShard;
-		this.lastProcessedSequenceNum = lastProcessedSequenceNum;
-	}
-
-	public KinesisStreamShard getKinesisStreamShard() {
-		return this.kinesisStreamShard;
-	}
-
-	public SequenceNumber getLastProcessedSequenceNum() {
-		return this.lastProcessedSequenceNum;
-	}
-
-	public void setLastProcessedSequenceNum(SequenceNumber update) {
-		this.lastProcessedSequenceNum = update;
-	}
-
-	@Override
-	public String toString() {
-		return "KinesisStreamShardState{" +
-			"kinesisStreamShard='" + kinesisStreamShard.toString() + "'" +
-			", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}";
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (!(obj instanceof KinesisStreamShardState)) {
-			return false;
-		}
-
-		if (obj == this) {
-			return true;
-		}
-
-		KinesisStreamShardState other = (KinesisStreamShardState) obj;
-
-		return kinesisStreamShard.equals(other.getKinesisStreamShard()) && lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
-	}
-
-	@Override
-	public int hashCode() {
-		return 37 * (kinesisStreamShard.hashCode() + lastProcessedSequenceNum.hashCode());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
deleted file mode 100644
index 8182201..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
-
-/**
- * Special flag values for sequence numbers in shards to indicate special positions.
- * The value is initially set by {@link FlinkKinesisConsumer} when {@link KinesisDataFetcher}s are created.
- * The KinesisDataFetchers will use this value to determine how to retrieve the starting shard iterator from AWS Kinesis.
- */
-public enum SentinelSequenceNumber {
-
-	/** Flag value for shard's sequence numbers to indicate that the
-	 * shard should start to be read from the latest incoming records */
-	SENTINEL_LATEST_SEQUENCE_NUM( new SequenceNumber("LATEST_SEQUENCE_NUM") ),
-
-	/** Flag value for shard's sequence numbers to indicate that the shard should
-	 * start to be read from the earliest records that haven't expired yet */
-	SENTINEL_EARLIEST_SEQUENCE_NUM( new SequenceNumber("EARLIEST_SEQUENCE_NUM") ),
-
-	/** Flag value to indicate that we have already read the last record of this shard
-	 * (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record) */
-	SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") );
-
-	private SequenceNumber sentinel;
-
-	SentinelSequenceNumber(SequenceNumber sentinel) {
-		this.sentinel = sentinel;
-	}
-
-	public SequenceNumber get() {
-		return sentinel;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
deleted file mode 100644
index 021f53f..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A serializable representation of a Kinesis record's sequence number. It has two fields: the main sequence number,
- * and also a subsequence number. If this {@link SequenceNumber} is referring to an aggregated Kinesis record, the
- * subsequence number will be a non-negative value representing the order of the sub-record within the aggregation.
- */
-public class SequenceNumber implements Serializable {
-
-	private static final long serialVersionUID = 876972197938972667L;
-
-	private static final String DELIMITER = "-";
-
-	private final String sequenceNumber;
-	private final long subSequenceNumber;
-
-	private final int cachedHash;
-
-	/**
-	 * Create a new instance for a non-aggregated Kinesis record without a subsequence number.
-	 * @param sequenceNumber the sequence number
-	 */
-	public SequenceNumber(String sequenceNumber) {
-		this(sequenceNumber, -1);
-	}
-
-	/**
-	 * Create a new instance, with the specified sequence number and subsequence number.
-	 * To represent the sequence number for a non-aggregated Kinesis record, the subsequence number should be -1.
-	 * Otherwise, give a non-negative sequence number to represent an aggregated Kinesis record.
-	 *
-	 * @param sequenceNumber the sequence number
-	 * @param subSequenceNumber the subsequence number (-1 to represent non-aggregated Kinesis records)
-	 */
-	public SequenceNumber(String sequenceNumber, long subSequenceNumber) {
-		this.sequenceNumber = checkNotNull(sequenceNumber);
-		this.subSequenceNumber = subSequenceNumber;
-
-		this.cachedHash = 37 * (sequenceNumber.hashCode() + Long.valueOf(subSequenceNumber).hashCode());
-	}
-
-	public boolean isAggregated() {
-		return subSequenceNumber >= 0;
-	}
-
-	public String getSequenceNumber() {
-		return sequenceNumber;
-	}
-
-	public long getSubSequenceNumber() {
-		return subSequenceNumber;
-	}
-
-	@Override
-	public String toString() {
-		if (isAggregated()) {
-			return sequenceNumber + DELIMITER + subSequenceNumber;
-		} else {
-			return sequenceNumber;
-		}
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (!(obj instanceof SequenceNumber)) {
-			return false;
-		}
-
-		if (obj == this) {
-			return true;
-		}
-
-		SequenceNumber other = (SequenceNumber) obj;
-
-		return sequenceNumber.equals(other.getSequenceNumber())
-			&& (subSequenceNumber == other.getSubSequenceNumber());
-	}
-
-	@Override
-	public int hashCode() {
-		return cachedHash;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
deleted file mode 100644
index 04b1654..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.proxy;
-
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Basic model class to bundle the shards retrieved from Kinesis on a {@link KinesisProxyInterface#getShardList(Map)} call.
- */
-public class GetShardListResult {
-
-	private final Map<String, LinkedList<KinesisStreamShard>> streamsToRetrievedShardList = new HashMap<>();
-
-	public void addRetrievedShardToStream(String stream, KinesisStreamShard retrievedShard) {
-		if (!streamsToRetrievedShardList.containsKey(stream)) {
-			streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
-		}
-		streamsToRetrievedShardList.get(stream).add(retrievedShard);
-	}
-
-	public void addRetrievedShardsToStream(String stream, List<KinesisStreamShard> retrievedShards) {
-		if (retrievedShards.size() != 0) {
-			if (!streamsToRetrievedShardList.containsKey(stream)) {
-				streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
-			}
-			streamsToRetrievedShardList.get(stream).addAll(retrievedShards);
-		}
-	}
-
-	public List<KinesisStreamShard> getRetrievedShardListOfStream(String stream) {
-		if (!streamsToRetrievedShardList.containsKey(stream)) {
-			return null;
-		} else {
-			return streamsToRetrievedShardList.get(stream);
-		}
-	}
-
-	public KinesisStreamShard getLastSeenShardOfStream(String stream) {
-		if (!streamsToRetrievedShardList.containsKey(stream)) {
-			return null;
-		} else {
-			return streamsToRetrievedShardList.get(stream).getLast();
-		}
-	}
-
-	public boolean hasRetrievedShards() {
-		return !streamsToRetrievedShardList.isEmpty();
-	}
-
-	public Set<String> getStreamsWithRetrievedShards() {
-		return streamsToRetrievedShardList.keySet();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
deleted file mode 100644
index 9ffc8e6..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.proxy;
-
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
-import com.amazonaws.services.kinesis.model.GetRecordsRequest;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
-import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
-import com.amazonaws.services.kinesis.model.LimitExceededException;
-import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
-import com.amazonaws.services.kinesis.model.StreamStatus;
-import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.Map;
-import java.util.Random;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Kinesis proxy implementation - a utility class that is used as a proxy to make
- * calls to AWS Kinesis for several functions, such as getting a list of shards and
- * fetching a batch of data records starting from a specified record sequence number.
- *
- * NOTE:
- * In the AWS KCL library, there is a similar implementation - {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
- * This implementation differs mainly in that we can make operations to arbitrary Kinesis streams, which is a needed
- * functionality for the Flink Kinesis Connecter since the consumer may simultaneously read from multiple Kinesis streams.
- */
-public class KinesisProxy implements KinesisProxyInterface {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);
-
-	/** The actual Kinesis client from the AWS SDK that we will be using to make calls */
-	private final AmazonKinesisClient kinesisClient;
-
-	/** Random seed used to calculate backoff jitter for Kinesis operations */
-	private final static Random seed = new Random();
-
-	// ------------------------------------------------------------------------
-	//  describeStream() related performance settings
-	// ------------------------------------------------------------------------
-
-	/** Base backoff millis for the describe stream operation */
-	private final long describeStreamBaseBackoffMillis;
-
-	/** Maximum backoff millis for the describe stream operation */
-	private final long describeStreamMaxBackoffMillis;
-
-	/** Exponential backoff power constant for the describe stream operation */
-	private final double describeStreamExpConstant;
-
-	// ------------------------------------------------------------------------
-	//  getRecords() related performance settings
-	// ------------------------------------------------------------------------
-
-	/** Base backoff millis for the get records operation */
-	private final long getRecordsBaseBackoffMillis;
-
-	/** Maximum backoff millis for the get records operation */
-	private final long getRecordsMaxBackoffMillis;
-
-	/** Exponential backoff power constant for the get records operation */
-	private final double getRecordsExpConstant;
-
-	/** Maximum attempts for the get records operation */
-	private final int getRecordsMaxAttempts;
-
-	// ------------------------------------------------------------------------
-	//  getShardIterator() related performance settings
-	// ------------------------------------------------------------------------
-
-	/** Base backoff millis for the get shard iterator operation */
-	private final long getShardIteratorBaseBackoffMillis;
-
-	/** Maximum backoff millis for the get shard iterator operation */
-	private final long getShardIteratorMaxBackoffMillis;
-
-	/** Exponential backoff power constant for the get shard iterator operation */
-	private final double getShardIteratorExpConstant;
-
-	/** Maximum attempts for the get shard iterator operation */
-	private final int getShardIteratorMaxAttempts;
-
-	/**
-	 * Create a new KinesisProxy based on the supplied configuration properties
-	 *
-	 * @param configProps configuration properties containing AWS credential and AWS region info
-	 */
-	private KinesisProxy(Properties configProps) {
-		checkNotNull(configProps);
-
-		this.kinesisClient = AWSUtil.createKinesisClient(configProps);
-
-		this.describeStreamBaseBackoffMillis = Long.valueOf(
-			configProps.getProperty(
-				ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
-				Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
-		this.describeStreamMaxBackoffMillis = Long.valueOf(
-			configProps.getProperty(
-				ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
-				Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
-		this.describeStreamExpConstant = Double.valueOf(
-			configProps.getProperty(
-				ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
-				Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
-
-		this.getRecordsBaseBackoffMillis = Long.valueOf(
-			configProps.getProperty(
-				ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
-				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE)));
-		this.getRecordsMaxBackoffMillis = Long.valueOf(
-			configProps.getProperty(
-				ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
-				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX)));
-		this.getRecordsExpConstant = Double.valueOf(
-			configProps.getProperty(
-				ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
-				Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
-		this.getRecordsMaxAttempts = Integer.valueOf(
-			configProps.getProperty(
-				ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
-				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
-
-		this.getShardIteratorBaseBackoffMillis = Long.valueOf(
-			configProps.getProperty(
-				ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
-				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE)));
-		this.getShardIteratorMaxBackoffMillis = Long.valueOf(
-			configProps.getProperty(
-				ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
-				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX)));
-		this.getShardIteratorExpConstant = Double.valueOf(
-			configProps.getProperty(
-				ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
-				Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
-		this.getShardIteratorMaxAttempts = Integer.valueOf(
-			configProps.getProperty(
-				ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
-				Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
-
-	}
-
-	/**
-	 * Creates a Kinesis proxy.
-	 *
-	 * @param configProps configuration properties
-	 * @return the created kinesis proxy
-	 */
-	public static KinesisProxyInterface create(Properties configProps) {
-		return new KinesisProxy(configProps);
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	@Override
-	public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
-		final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
-		getRecordsRequest.setShardIterator(shardIterator);
-		getRecordsRequest.setLimit(maxRecordsToGet);
-
-		GetRecordsResult getRecordsResult = null;
-
-		int attempt = 0;
-		while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
-			try {
-				getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
-			} catch (ProvisionedThroughputExceededException ex) {
-				long backoffMillis = fullJitterBackoff(
-					getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
-				LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
-					+ backoffMillis + " millis.");
-				Thread.sleep(backoffMillis);
-			}
-		}
-
-		if (getRecordsResult == null) {
-			throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
-				" retry attempts returned ProvisionedThroughputExceededException.");
-		}
-
-		return getRecordsResult;
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	@Override
-	public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
-		GetShardListResult result = new GetShardListResult();
-
-		for (Map.Entry<String,String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) {
-			String stream = streamNameWithLastSeenShardId.getKey();
-			String lastSeenShardId = streamNameWithLastSeenShardId.getValue();
-			result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId));
-		}
-		return result;
-	}
-
-	/**
-	 * {@inheritDoc}
-	 */
-	@Override
-	public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException {
-		GetShardIteratorResult getShardIteratorResult = null;
-
-		int attempt = 0;
-		while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
-			try {
-				getShardIteratorResult =
-					kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
-			} catch (ProvisionedThroughputExceededException ex) {
-				long backoffMillis = fullJitterBackoff(
-					getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
-				LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
-					+ backoffMillis + " millis.");
-				Thread.sleep(backoffMillis);
-			}
-		}
-
-		if (getShardIteratorResult == null) {
-			throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts +
-				" retry attempts returned ProvisionedThroughputExceededException.");
-		}
-		return getShardIteratorResult.getShardIterator();
-	}
-
-	private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
-		List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
-
-		DescribeStreamResult describeStreamResult;
-		do {
-			describeStreamResult = describeStream(streamName, lastSeenShardId);
-
-			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
-			for (Shard shard : shards) {
-				shardsOfStream.add(new KinesisStreamShard(streamName, shard));
-			}
-
-			if (shards.size() != 0) {
-				lastSeenShardId = shards.get(shards.size() - 1).getShardId();
-			}
-		} while (describeStreamResult.getStreamDescription().isHasMoreShards());
-
-		return shardsOfStream;
-	}
-
-	/**
-	 * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess.
-	 *
-	 * This method is using a "full jitter" approach described in AWS's article,
-	 * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>.
-	 * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This
-	 * jitter backoff approach will help distribute calls across the fetchers over time.
-	 *
-	 * @param streamName the stream to describe
-	 * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
-	 * @return the result of the describe stream operation
-	 */
-	private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
-		final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
-		describeStreamRequest.setStreamName(streamName);
-		describeStreamRequest.setExclusiveStartShardId(startShardId);
-
-		DescribeStreamResult describeStreamResult = null;
-
-		// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
-		int attemptCount = 0;
-		while (describeStreamResult == null) { // retry until we get a result
-			try {
-				describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
-			} catch (LimitExceededException le) {
-				long backoffMillis = fullJitterBackoff(
-					describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
-				LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
-					+ backoffMillis + " millis.");
-				Thread.sleep(backoffMillis);
-			} catch (ResourceNotFoundException re) {
-				throw new RuntimeException("Error while getting stream details", re);
-			}
-		}
-
-		String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
-		if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
-			if (LOG.isWarnEnabled()) {
-				LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
-					"describeStream operation will not contain any shard information.");
-			}
-		}
-
-		// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
-		// start shard id in the returned shards list; check if we need to remove these erroneously returned shards
-		if (startShardId != null) {
-			List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
-			Iterator<Shard> shardItr = shards.iterator();
-			while (shardItr.hasNext()) {
-				if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
-					shardItr.remove();
-				}
-			}
-		}
-
-		return describeStreamResult;
-	}
-
-	private static long fullJitterBackoff(long base, long max, double power, int attempt) {
-		long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt));
-		return (long)(seed.nextDouble()*exponentialBackoff); // random jitter between 0 and the exponential backoff
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
deleted file mode 100644
index 39ddc52..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.proxy;
-
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-
-import java.util.Map;
-
-/**
- * Interface for a Kinesis proxy that operates on multiple Kinesis streams within the same AWS service region.
- */
-public interface KinesisProxyInterface {
-
-	/**
-	 * Get a shard iterator from the specified position in a shard.
-	 * The retrieved shard iterator can be used in {@link KinesisProxyInterface#getRecords(String, int)}}
-	 * to read data from the Kinesis shard.
-	 *
-	 * @param shard the shard to get the iterator
-	 * @param shardIteratorType the iterator type, defining how the shard is to be iterated
-	 *                          (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
-	 * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST
-	 * @return shard iterator which can be used to read data from Kinesis
-	 * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
-	 *                              operation has exceeded the rate limit; this exception will be thrown
-	 *                              if the backoff is interrupted.
-	 */
-	String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException;
-
-	/**
-	 * Get the next batch of data records using a specific shard iterator
-	 *
-	 * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading
-	 * @param maxRecordsToGet the maximum amount of records to retrieve for this batch
-	 * @return the batch of retrieved records, also with a shard iterator that can be used to get the next batch
-	 * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
-	 *                              operation has exceeded the rate limit; this exception will be thrown
-	 *                              if the backoff is interrupted.
-	 */
-	GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException;
-
-	/**
-	 * Get shard list of multiple Kinesis streams, ignoring the
-	 * shards of each stream before a specified last seen shard id.
-	 *
-	 * @param streamNamesWithLastSeenShardIds a map with stream as key, and last seen shard id as value
-	 * @return result of the shard list query
-	 * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
-	 *                              operation has exceeded the rate limit; this exception will be thrown
-	 *                              if the backoff is interrupted.
-	 */
-	GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException;
-}