You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:34:52 UTC
[02/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors]
Merge batch and streaming connectors into common Maven module.
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
deleted file mode 100644
index a06fdca..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ /dev/null
@@ -1,679 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.internals;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
-import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import org.apache.flink.util.InstantiationUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates
- * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following:
- * <ul>
- * <li>1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset
- * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be
- * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe
- * to the same subset of shards even after restoring)</li>
- * <li>2. decide where in each discovered shard should the fetcher start subscribing to</li>
- * <li>3. subscribe to shards by creating a single thread for each shard</li>
- * </ul>
- *
- * <p>The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery),
- * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed
- * by multiple threads, these operations should only be done using the handler methods provided in this class.
- */
-public class KinesisDataFetcher<T> {
-
- private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class);
-
- // ------------------------------------------------------------------------
- // Consumer-wide settings
- // ------------------------------------------------------------------------
-
- /** Configuration properties for the Flink Kinesis Consumer */
- private final Properties configProps;
-
- /** The list of Kinesis streams that the consumer is subscribing to */
- private final List<String> streams;
-
- /**
- * The deserialization schema we will be using to convert Kinesis records to Flink objects.
- * Note that since this might not be thread-safe, {@link ShardConsumer}s using this must
- * clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}.
- */
- private final KinesisDeserializationSchema<T> deserializationSchema;
-
- // ------------------------------------------------------------------------
- // Subtask-specific settings
- // ------------------------------------------------------------------------
-
- /** Runtime context of the subtask that this fetcher was created in */
- private final RuntimeContext runtimeContext;
-
- private final int totalNumberOfConsumerSubtasks;
-
- private final int indexOfThisConsumerSubtask;
-
- /**
- * This flag should be set by {@link FlinkKinesisConsumer} using
- * {@link KinesisDataFetcher#setIsRestoringFromFailure(boolean)}
- */
- private boolean isRestoredFromFailure;
-
- // ------------------------------------------------------------------------
- // Executor services to run created threads
- // ------------------------------------------------------------------------
-
- /** Executor service to run {@link ShardConsumer}s to consume Kinesis shards */
- private final ExecutorService shardConsumersExecutor;
-
- // ------------------------------------------------------------------------
- // Managed state, accessed and updated across multiple threads
- // ------------------------------------------------------------------------
-
- /** The last discovered shard ids of each subscribed stream, updated as the fetcher discovers new shards in.
- * Note: this state will be updated if new shards are found when {@link KinesisDataFetcher#discoverNewShardsToSubscribe()} is called.
- */
- private final Map<String, String> subscribedStreamsToLastDiscoveredShardIds;
-
- /**
- * The shards, along with their last processed sequence numbers, that this fetcher is subscribed to. The fetcher
- * will add new subscribed shard states to this list as it discovers new shards. {@link ShardConsumer} threads update
- * the last processed sequence number of subscribed shards as they fetch and process records.
- *
- * <p>Note that since multiple {@link ShardConsumer} threads will be performing operations on this list, all operations
- * must be wrapped in synchronized blocks on the {@link KinesisDataFetcher#checkpointLock} lock. For this purpose,
- * all threads must use the following thread-safe methods this class provides to operate on this list:
- * <ul>
- * <li>{@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}</li>
- * <li>{@link KinesisDataFetcher#updateState(int, SequenceNumber)}</li>
- * <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(T, long, int, SequenceNumber)}</li>
- * </ul>
- */
- private final List<KinesisStreamShardState> subscribedShardsState;
-
- private final SourceFunction.SourceContext<T> sourceContext;
-
- /** Checkpoint lock, also used to synchronize operations on subscribedShardsState */
- private final Object checkpointLock;
-
- /** Reference to the first error thrown by any of the {@link ShardConsumer} threads */
- private final AtomicReference<Throwable> error;
-
- /** The Kinesis proxy that the fetcher will be using to discover new shards */
- private final KinesisProxyInterface kinesis;
-
- /** Thread that executed runFetcher() */
- private Thread mainThread;
-
- /**
- * The current number of shards that are actively read by this fetcher.
- *
- * This value is updated in {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
- * and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}.
- */
- private final AtomicInteger numberOfActiveShards = new AtomicInteger(0);
-
- private volatile boolean running = true;
-
- /**
- * Creates a Kinesis Data Fetcher.
- *
- * @param streams the streams to subscribe to
- * @param sourceContext context of the source function
- * @param runtimeContext this subtask's runtime context
- * @param configProps the consumer configuration properties
- * @param deserializationSchema deserialization schema
- */
- public KinesisDataFetcher(List<String> streams,
- SourceFunction.SourceContext<T> sourceContext,
- RuntimeContext runtimeContext,
- Properties configProps,
- KinesisDeserializationSchema<T> deserializationSchema) {
- this(streams,
- sourceContext,
- sourceContext.getCheckpointLock(),
- runtimeContext,
- configProps,
- deserializationSchema,
- new AtomicReference<Throwable>(),
- new LinkedList<KinesisStreamShardState>(),
- createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
- KinesisProxy.create(configProps));
- }
-
- /** This constructor is exposed for testing purposes */
- protected KinesisDataFetcher(List<String> streams,
- SourceFunction.SourceContext<T> sourceContext,
- Object checkpointLock,
- RuntimeContext runtimeContext,
- Properties configProps,
- KinesisDeserializationSchema<T> deserializationSchema,
- AtomicReference<Throwable> error,
- LinkedList<KinesisStreamShardState> subscribedShardsState,
- HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds,
- KinesisProxyInterface kinesis) {
- this.streams = checkNotNull(streams);
- this.configProps = checkNotNull(configProps);
- this.sourceContext = checkNotNull(sourceContext);
- this.checkpointLock = checkNotNull(checkpointLock);
- this.runtimeContext = checkNotNull(runtimeContext);
- this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks();
- this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask();
- this.deserializationSchema = checkNotNull(deserializationSchema);
- this.kinesis = checkNotNull(kinesis);
-
- this.error = checkNotNull(error);
- this.subscribedShardsState = checkNotNull(subscribedShardsState);
- this.subscribedStreamsToLastDiscoveredShardIds = checkNotNull(subscribedStreamsToLastDiscoveredShardIds);
-
- this.shardConsumersExecutor =
- createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks());
- }
-
- /**
- * Starts the fetcher. After starting the fetcher, it can only
- * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}.
- *
- * @throws Exception the first error or exception thrown by the fetcher or any of the threads created by the fetcher.
- */
- public void runFetcher() throws Exception {
-
- // check that we are running before proceeding
- if (!running) {
- return;
- }
-
- this.mainThread = Thread.currentThread();
-
- // ------------------------------------------------------------------------
- // Procedures before starting the infinite while loop:
- // ------------------------------------------------------------------------
-
- // 1. query for any new shards that may have been created while the Kinesis consumer was not running,
- // and register them to the subscribedShardState list.
- if (LOG.isDebugEnabled()) {
- String logFormat = (!isRestoredFromFailure)
- ? "Subtask {} is trying to discover initial shards ..."
- : "Subtask {} is trying to discover any new shards that were created while the consumer wasn't " +
- "running due to failure ...";
-
- LOG.debug(logFormat, indexOfThisConsumerSubtask);
- }
- List<KinesisStreamShard> newShardsCreatedWhileNotRunning = discoverNewShardsToSubscribe();
- for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) {
- // the starting state for new shards created while the consumer wasn't running depends on whether or not
- // we are starting fresh (not restoring from a checkpoint); when we are starting fresh, this simply means
- // all existing shards of streams we are subscribing to are new shards; when we are restoring from checkpoint,
- // any new shards due to Kinesis resharding from the time of the checkpoint will be considered new shards.
- InitialPosition initialPosition = InitialPosition.valueOf(configProps.getProperty(
- ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION));
-
- SentinelSequenceNumber startingStateForNewShard = (isRestoredFromFailure)
- ? SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM
- : initialPosition.toSentinelSequenceNumber();
-
- if (LOG.isInfoEnabled()) {
- String logFormat = (!isRestoredFromFailure)
- ? "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}"
- : "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't " +
- "running due to failure, starting state set as sequence number {}";
-
- LOG.info(logFormat, indexOfThisConsumerSubtask, shard.toString(), startingStateForNewShard.get());
- }
- registerNewSubscribedShardState(new KinesisStreamShardState(shard, startingStateForNewShard.get()));
- }
-
- // 2. check that there is at least one shard in the subscribed streams to consume from (can be done by
- // checking if at least one value in subscribedStreamsToLastDiscoveredShardIds is not null)
- boolean hasShards = false;
- StringBuilder streamsWithNoShardsFound = new StringBuilder();
- for (Map.Entry<String, String> streamToLastDiscoveredShardEntry : subscribedStreamsToLastDiscoveredShardIds.entrySet()) {
- if (streamToLastDiscoveredShardEntry.getValue() != null) {
- hasShards = true;
- } else {
- streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(", ");
- }
- }
-
- if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) {
- LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams: {}",
- indexOfThisConsumerSubtask, streamsWithNoShardsFound.toString());
- }
-
- if (!hasShards) {
- throw new RuntimeException("No shards can be found for all subscribed streams: " + streams);
- }
-
- // 3. start consuming any shard state we already have in the subscribedShardState up to this point; the
- // subscribedShardState may already be seeded with values due to step 1., or explicitly added by the
- // consumer using a restored state checkpoint
- for (int seededStateIndex = 0; seededStateIndex < subscribedShardsState.size(); seededStateIndex++) {
- KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex);
-
- // only start a consuming thread if the seeded subscribed shard has not been completely read already
- if (!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}",
- indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(),
- seededShardState.getLastProcessedSequenceNum(), seededStateIndex);
- }
-
- shardConsumersExecutor.submit(
- new ShardConsumer<>(
- this,
- seededStateIndex,
- subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(),
- subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum()));
- }
- }
-
- // ------------------------------------------------------------------------
-
- // finally, start the infinite shard discovery and consumer launching loop;
- // we will escape from this loop only when shutdownFetcher() or stopWithError() is called
-
- final long discoveryIntervalMillis = Long.valueOf(
- configProps.getProperty(
- ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
- Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
-
- // FLINK-4341:
- // For downstream operators that work on time (ex. window operators), we are required to emit a max value watermark
- // for subtasks that won't continue to have shards to read from unless resharding happens in the future, otherwise
- // the downstream watermarks would not advance, leading to unbounded accumulating state.
- //
- // The side-effect of this limitation is that on resharding, we must fail hard if the newly discovered shard
- // is to be subscribed by a subtask that has previously emitted a max value watermark, otherwise the watermarks
- // will be messed up.
- //
- // There are 2 cases were we need to either emit a max value watermark, or deliberately fail hard:
- // (a) if this subtask has no more shards to read from unless resharding happens in the future, we emit a max
- // value watermark. This case is encountered when 1) all previously read shards by this subtask were closed
- // due to resharding, 2) when this subtask was initially only subscribed to closed shards while the consumer
- // was told to start from TRIM_HORIZON, or 3) there was initially no shards for this subtask to read on startup.
- // (b) this subtask has discovered new shards to read from due to a reshard; if this subtask has already emitted
- // a max value watermark, we must deliberately fail hard to avoid messing up the watermarks. The new shards
- // will be subscribed by this subtask after restore as initial shards on startup.
- //
- // TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager
- // Please see FLINK-4341 for more detail
-
- boolean emittedMaxValueWatermark = false;
-
- if (this.numberOfActiveShards.get() == 0) {
- // FLINK-4341 workaround case (a) - please see the above for details on this case
- LOG.info("Subtask {} has no initial shards to read on startup; emitting max value watermark ...",
- indexOfThisConsumerSubtask);
- sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
- emittedMaxValueWatermark = true;
- }
-
- while (running) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...",
- indexOfThisConsumerSubtask);
- }
- List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe();
-
- // -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards --
- // Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(), there exists
- // a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards
- // may not correctly reflect the discover result in the below case determination. This may lead to incorrect
- // case determination on the current discovery attempt, but can still be correctly handled on future attempts.
- //
- // Although this can be resolved by wrapping the current shard discovery attempt with the below
- // case determination within a synchronized block on the checkpoint lock for atomicity, there will be
- // considerable throughput performance regression as shard discovery is a remote call to AWS. Therefore,
- // since the case determination is a temporary workaround for FLINK-4341, the race condition is tolerable as
- // we can still eventually handle max value watermark emitting / deliberately failing on successive
- // discovery attempts.
-
- if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) {
- // FLINK-4341 workaround case (a) - please see the above for details on this case
- LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...",
- indexOfThisConsumerSubtask);
- sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
- emittedMaxValueWatermark = true;
- } else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) {
- // FLINK-4341 workaround case (b) - please see the above for details on this case
- //
- // Note that in the case where on resharding this subtask ceased to read all of it's previous shards
- // but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark
- // will be false; this allows the fetcher to continue reading the new shards without failing on such cases.
- // However, due to the race condition mentioned above, we might still fall into case (a) first, and
- // then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value
- // watermark emitting still remains to be correct.
-
- LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" +
- " up watermarks; the new shards will be subscribed by this subtask after restore ...",
- indexOfThisConsumerSubtask, newShardsDueToResharding.size());
- throw new RuntimeException("Deliberate failure to avoid messing up watermarks");
- }
-
- for (KinesisStreamShard shard : newShardsDueToResharding) {
- // since there may be delay in discovering a new shard, all new shards due to
- // resharding should be read starting from the earliest record possible
- KinesisStreamShardState newShardState =
- new KinesisStreamShardState(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
- int newStateIndex = registerNewSubscribedShardState(newShardState);
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming " +
- "the shard from sequence number {} with ShardConsumer {}",
- indexOfThisConsumerSubtask, newShardState.getKinesisStreamShard().toString(),
- newShardState.getLastProcessedSequenceNum(), newStateIndex);
- }
-
- shardConsumersExecutor.submit(
- new ShardConsumer<>(
- this,
- newStateIndex,
- newShardState.getKinesisStreamShard(),
- newShardState.getLastProcessedSequenceNum()));
- }
-
- // we also check if we are running here so that we won't start the discovery sleep
- // interval if the running flag was set to false during the middle of the while loop
- if (running && discoveryIntervalMillis != 0) {
- try {
- Thread.sleep(discoveryIntervalMillis);
- } catch (InterruptedException iex) {
- // the sleep may be interrupted by shutdownFetcher()
- }
- }
- }
-
- // make sure all resources have been terminated before leaving
- awaitTermination();
-
- // any error thrown in the shard consumer threads will be thrown to the main thread
- Throwable throwable = this.error.get();
- if (throwable != null) {
- if (throwable instanceof Exception) {
- throw (Exception) throwable;
- } else if (throwable instanceof Error) {
- throw (Error) throwable;
- } else {
- throw new Exception(throwable);
- }
- }
- }
-
- /**
- * Creates a snapshot of the current last processed sequence numbers of each subscribed shard.
- *
- * @return state snapshot
- */
- public HashMap<KinesisStreamShard, SequenceNumber> snapshotState() {
- // this method assumes that the checkpoint lock is held
- assert Thread.holdsLock(checkpointLock);
-
- HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new HashMap<>();
- for (KinesisStreamShardState shardWithState : subscribedShardsState) {
- stateSnapshot.put(shardWithState.getKinesisStreamShard(), shardWithState.getLastProcessedSequenceNum());
- }
- return stateSnapshot;
- }
-
- /**
- * Starts shutting down the fetcher. Must be called to allow {@link KinesisDataFetcher#runFetcher()} to complete.
- * Once called, the shutdown procedure will be executed and all shard consuming threads will be interrupted.
- */
- public void shutdownFetcher() {
- running = false;
- mainThread.interrupt(); // the main thread may be sleeping for the discovery interval
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
- }
- shardConsumersExecutor.shutdownNow();
- }
-
- /** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown */
- public void awaitTermination() throws InterruptedException {
- while(!shardConsumersExecutor.isTerminated()) {
- Thread.sleep(50);
- }
- }
-
- /** Called by created threads to pass on errors. Only the first thrown error is set.
- * Once set, the shutdown process will be executed and all shard consuming threads will be interrupted. */
- protected void stopWithError(Throwable throwable) {
- if (this.error.compareAndSet(null, throwable)) {
- shutdownFetcher();
- }
- }
-
- // ------------------------------------------------------------------------
- // Functions that update the subscribedStreamToLastDiscoveredShardIds state
- // ------------------------------------------------------------------------
-
- /** Updates the last discovered shard of a subscribed stream; only updates if the update is valid */
- public void advanceLastDiscoveredShardOfStream(String stream, String shardId) {
- String lastSeenShardIdOfStream = this.subscribedStreamsToLastDiscoveredShardIds.get(stream);
-
- // the update is valid only if the given shard id is greater
- // than the previous last seen shard id of the stream
- if (lastSeenShardIdOfStream == null) {
- // if not previously set, simply put as the last seen shard id
- this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
- } else if (KinesisStreamShard.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) {
- this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId);
- }
- }
-
- /**
- * A utility function that does the following:
- *
- * 1. Find new shards for each stream that we haven't seen before
- * 2. For each new shard, determine whether this consumer subtask should subscribe to them;
- * if yes, it is added to the returned list of shards
- * 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards
- * that we have already seen before the next time this function is called
- */
- private List<KinesisStreamShard> discoverNewShardsToSubscribe() throws InterruptedException {
-
- List<KinesisStreamShard> newShardsToSubscribe = new LinkedList<>();
-
- GetShardListResult shardListResult = kinesis.getShardList(subscribedStreamsToLastDiscoveredShardIds);
- if (shardListResult.hasRetrievedShards()) {
- Set<String> streamsWithNewShards = shardListResult.getStreamsWithRetrievedShards();
-
- for (String stream : streamsWithNewShards) {
- List<KinesisStreamShard> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream);
- for (KinesisStreamShard newShard : newShardsOfStream) {
- if (isThisSubtaskShouldSubscribeTo(newShard, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) {
- newShardsToSubscribe.add(newShard);
- }
- }
-
- advanceLastDiscoveredShardOfStream(
- stream, shardListResult.getLastSeenShardOfStream(stream).getShard().getShardId());
- }
- }
-
- return newShardsToSubscribe;
- }
-
- // ------------------------------------------------------------------------
- // Functions to get / set information about the consumer
- // ------------------------------------------------------------------------
-
- public void setIsRestoringFromFailure(boolean bool) {
- this.isRestoredFromFailure = bool;
- }
-
- protected Properties getConsumerConfiguration() {
- return configProps;
- }
-
- protected KinesisDeserializationSchema<T> getClonedDeserializationSchema() {
- try {
- return InstantiationUtil.clone(deserializationSchema, runtimeContext.getUserCodeClassLoader());
- } catch (IOException | ClassNotFoundException ex) {
- // this really shouldn't happen; simply wrap it around a runtime exception
- throw new RuntimeException(ex);
- }
- }
-
- // ------------------------------------------------------------------------
- // Thread-safe operations for record emitting and shard state updating
- // that assure atomicity with respect to the checkpoint lock
- // ------------------------------------------------------------------------
-
- /**
- * Atomic operation to collect a record and update state to the sequence number of the record.
- * This method is called by {@link ShardConsumer}s.
- *
- * @param record the record to collect
- * @param recordTimestamp timestamp to attach to the collected record
- * @param shardStateIndex index of the shard to update in subscribedShardsState;
- * this index should be the returned value from
- * {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
- * when the shard state was registered.
- * @param lastSequenceNumber the last sequence number value to update
- */
- protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
- synchronized (checkpointLock) {
- sourceContext.collectWithTimestamp(record, recordTimestamp);
- updateState(shardStateIndex, lastSequenceNumber);
- }
- }
-
- /**
- * Update the shard to last processed sequence number state.
- * This method is called by {@link ShardConsumer}s.
- *
- * @param shardStateIndex index of the shard to update in subscribedShardsState;
- * this index should be the returned value from
- * {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called
- * when the shard state was registered.
- * @param lastSequenceNumber the last sequence number value to update
- */
- protected void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) {
- synchronized (checkpointLock) {
- subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber);
-
- // if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
- // we've finished reading the shard and should determine it to be non-active
- if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
- this.numberOfActiveShards.decrementAndGet();
- LOG.info("Subtask {} has reached the end of subscribed shard: {}",
- indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
- }
- }
- }
-
- /**
- * Register a new subscribed shard state.
- *
- * @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to
- */
- public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) {
- synchronized (checkpointLock) {
- subscribedShardsState.add(newSubscribedShardState);
-
- // If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case
- // if the consumer had already finished reading a shard before we failed and restored), we determine that
- // this subtask has a new active shard
- if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
- this.numberOfActiveShards.incrementAndGet();
- }
-
- return subscribedShardsState.size()-1;
- }
- }
-
- // ------------------------------------------------------------------------
- // Miscellaneous utility functions
- // ------------------------------------------------------------------------
-
- /**
- * Utility function to determine whether a shard should be subscribed by this consumer subtask.
- *
- * @param shard the shard to determine
- * @param totalNumberOfConsumerSubtasks total number of consumer subtasks
- * @param indexOfThisConsumerSubtask index of this consumer subtask
- */
- private static boolean isThisSubtaskShouldSubscribeTo(KinesisStreamShard shard,
- int totalNumberOfConsumerSubtasks,
- int indexOfThisConsumerSubtask) {
- return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask;
- }
-
- private static ExecutorService createShardConsumersThreadPool(final String subtaskName) {
- return Executors.newCachedThreadPool(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable runnable) {
- final AtomicLong threadCount = new AtomicLong(0);
- Thread thread = new Thread(runnable);
- thread.setName("shardConsumers-" + subtaskName + "-thread-" + threadCount.getAndIncrement());
- thread.setDaemon(true);
- return thread;
- }
- });
- }
-
- /**
- * Utility function to create an initial map of the last discovered shard id of each subscribed stream, set to null;
- * This is called in the constructor; correct values will be set later on by calling advanceLastDiscoveredShardOfStream()
- *
- * @param streams the list of subscribed streams
- * @return the initial map for subscribedStreamsToLastDiscoveredShardIds
- */
- protected static HashMap<String, String> createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams) {
- HashMap<String, String> initial = new HashMap<>();
- for (String stream : streams) {
- initial.put(stream, null);
- }
- return initial;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
deleted file mode 100644
index 612a4a7..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.internals;
-
-import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
-import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Thread that does the actual data pulling from AWS Kinesis shards. Each thread is in charge of one Kinesis shard only.
- */
-public class ShardConsumer<T> implements Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
-
- private final KinesisDeserializationSchema<T> deserializer;
-
- private final KinesisProxyInterface kinesis;
-
- private final int subscribedShardStateIndex;
-
- private final KinesisDataFetcher<T> fetcherRef;
-
- private final KinesisStreamShard subscribedShard;
-
- private final int maxNumberOfRecordsPerFetch;
- private final long fetchIntervalMillis;
-
- private SequenceNumber lastSequenceNum;
-
- /**
- * Creates a shard consumer.
- *
- * @param fetcherRef reference to the owning fetcher
- * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
- * @param subscribedShard the shard this consumer is subscribed to
- * @param lastSequenceNum the sequence number in the shard to start consuming
- */
- public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
- Integer subscribedShardStateIndex,
- KinesisStreamShard subscribedShard,
- SequenceNumber lastSequenceNum) {
- this(fetcherRef,
- subscribedShardStateIndex,
- subscribedShard,
- lastSequenceNum,
- KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
- }
-
- /** This constructor is exposed for testing purposes */
- protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
- Integer subscribedShardStateIndex,
- KinesisStreamShard subscribedShard,
- SequenceNumber lastSequenceNum,
- KinesisProxyInterface kinesis) {
- this.fetcherRef = checkNotNull(fetcherRef);
- this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex);
- this.subscribedShard = checkNotNull(subscribedShard);
- this.lastSequenceNum = checkNotNull(lastSequenceNum);
- checkArgument(
- !lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
- "Should not start a ShardConsumer if the shard has already been completely read.");
-
- this.deserializer = fetcherRef.getClonedDeserializationSchema();
-
- Properties consumerConfig = fetcherRef.getConsumerConfiguration();
- this.kinesis = kinesis;
- this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfig.getProperty(
- ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
- Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
- this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
- ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
- Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void run() {
- String nextShardItr;
-
- try {
- // before infinitely looping, we set the initial nextShardItr appropriately
-
- if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
- // if the shard is already closed, there will be no latest next record to get for this shard
- if (subscribedShard.isClosed()) {
- nextShardItr = null;
- } else {
- nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
- }
- } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
- nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
- } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
- nextShardItr = null;
- } else {
- // we will be starting from an actual sequence number (due to restore from failure).
- // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records
- // from the last aggregated record; otherwise, we can simply start iterating from the record right after.
-
- if (lastSequenceNum.isAggregated()) {
- String itrForLastAggregatedRecord =
- kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
-
- // get only the last aggregated record
- GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1);
-
- List<UserRecord> fetchedRecords = deaggregateRecords(
- getRecordsResult.getRecords(),
- subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
- subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
- long lastSubSequenceNum = lastSequenceNum.getSubSequenceNumber();
- for (UserRecord record : fetchedRecords) {
- // we have found a dangling sub-record if it has a larger subsequence number
- // than our last sequence number; if so, collect the record and update state
- if (record.getSubSequenceNumber() > lastSubSequenceNum) {
- deserializeRecordForCollectionAndUpdateState(record);
- }
- }
-
- // set the nextShardItr so we can continue iterating in the next while loop
- nextShardItr = getRecordsResult.getNextShardIterator();
- } else {
- // the last record was non-aggregated, so we can simply start from the next record
- nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
- }
- }
-
- while(isRunning()) {
- if (nextShardItr == null) {
- fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
-
- // we can close this consumer thread once we've reached the end of the subscribed shard
- break;
- } else {
- if (fetchIntervalMillis != 0) {
- Thread.sleep(fetchIntervalMillis);
- }
-
- GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
-
- // each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
- List<UserRecord> fetchedRecords = deaggregateRecords(
- getRecordsResult.getRecords(),
- subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
- subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
-
- for (UserRecord record : fetchedRecords) {
- deserializeRecordForCollectionAndUpdateState(record);
- }
-
- nextShardItr = getRecordsResult.getNextShardIterator();
- }
- }
- } catch (Throwable t) {
- fetcherRef.stopWithError(t);
- }
- }
-
- /**
- * The loop in run() checks this before fetching next batch of records. Since this runnable will be executed
- * by the ExecutorService {@link KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread
- * would be by calling shutdownNow() on {@link KinesisDataFetcher#shardConsumersExecutor} and let the executor service
- * interrupt all currently running {@link ShardConsumer}s.
- */
- private boolean isRunning() {
- return !Thread.interrupted();
- }
-
- /**
- * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last
- * successfully collected sequence number in this shard consumer is also updated so that
- * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard
- * iterators if necessary.
- *
- * Note that the server-side Kinesis timestamp is attached to the record when collected. When the
- * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default.
- *
- * @param record record to deserialize and collect
- * @throws IOException
- */
- private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
- throws IOException {
- ByteBuffer recordData = record.getData();
-
- byte[] dataBytes = new byte[recordData.remaining()];
- recordData.get(dataBytes);
-
- final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime();
-
- final T value = deserializer.deserialize(
- dataBytes,
- record.getPartitionKey(),
- record.getSequenceNumber(),
- approxArrivalTimestamp,
- subscribedShard.getStreamName(),
- subscribedShard.getShard().getShardId());
-
- SequenceNumber collectedSequenceNumber = (record.isAggregated())
- ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())
- : new SequenceNumber(record.getSequenceNumber());
-
- fetcherRef.emitRecordAndUpdateState(
- value,
- approxArrivalTimestamp,
- subscribedShardStateIndex,
- collectedSequenceNumber);
-
- lastSequenceNum = collectedSequenceNumber;
- }
-
- /**
- * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
- * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
- * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
- * be used for the next call to this method.
- *
- * Note: it is important that this method is not called again before all the records from the last result have been
- * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
- * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
- * incorrect shard iteration if the iterator had to be refreshed.
- *
- * @param shardItr shard iterator to use
- * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
- * @return get records result
- * @throws InterruptedException
- */
- private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
- GetRecordsResult getRecordsResult = null;
- while (getRecordsResult == null) {
- try {
- getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
- } catch (ExpiredIteratorException eiEx) {
- LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
- " refreshing the iterator ...", shardItr, subscribedShard);
- shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
-
- // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
- if (fetchIntervalMillis != 0) {
- Thread.sleep(fetchIntervalMillis);
- }
- }
- }
- return getRecordsResult;
- }
-
- @SuppressWarnings("unchecked")
- protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
- return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
deleted file mode 100644
index 53ed11b..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-import com.amazonaws.services.kinesis.model.Shard;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A serializable representation of a AWS Kinesis Stream shard. It is basically a wrapper class around the information
- * provided along with {@link com.amazonaws.services.kinesis.model.Shard}, with some extra utility methods to
- * determine whether or not a shard is closed and whether or not the shard is a result of parent shard splits or merges.
- */
-public class KinesisStreamShard implements Serializable {
-
- private static final long serialVersionUID = -6004217801761077536L;
-
- private final String streamName;
- private final Shard shard;
-
- private final int cachedHash;
-
- /**
- * Create a new KinesisStreamShard
- *
- * @param streamName
- * the name of the Kinesis stream that this shard belongs to
- * @param shard
- * the actual AWS Shard instance that will be wrapped within this KinesisStreamShard
- */
- public KinesisStreamShard(String streamName, Shard shard) {
- this.streamName = checkNotNull(streamName);
- this.shard = checkNotNull(shard);
-
- // since our description of Kinesis Streams shards can be fully defined with the stream name and shard id,
- // our hash doesn't need to use hash code of Amazon's description of Shards, which uses other info for calculation
- int hash = 17;
- hash = 37 * hash + streamName.hashCode();
- hash = 37 * hash + shard.getShardId().hashCode();
- this.cachedHash = hash;
- }
-
- public String getStreamName() {
- return streamName;
- }
-
- public boolean isClosed() {
- return (shard.getSequenceNumberRange().getEndingSequenceNumber() != null);
- }
-
- public Shard getShard() {
- return shard;
- }
-
- @Override
- public String toString() {
- return "KinesisStreamShard{" +
- "streamName='" + streamName + "'" +
- ", shard='" + shard.toString() + "'}";
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof KinesisStreamShard)) {
- return false;
- }
-
- if (obj == this) {
- return true;
- }
-
- KinesisStreamShard other = (KinesisStreamShard) obj;
-
- return streamName.equals(other.getStreamName()) && shard.equals(other.getShard());
- }
-
- @Override
- public int hashCode() {
- return cachedHash;
- }
-
- /**
- * Utility function to compare two shard ids
- *
- * @param firstShardId first shard id to compare
- * @param secondShardId second shard id to compare
- * @return a value less than 0 if the first shard id is smaller than the second shard id,
- * or a value larger than 0 the first shard is larger then the second shard id,
- * or 0 if they are equal
- */
- public static int compareShardIds(String firstShardId, String secondShardId) {
- if (!isValidShardId(firstShardId)) {
- throw new IllegalArgumentException("The first shard id has invalid format.");
- }
-
- if (!isValidShardId(secondShardId)) {
- throw new IllegalArgumentException("The second shard id has invalid format.");
- }
-
- // digit segment of the shard id starts at index 8
- return Long.compare(Long.parseLong(firstShardId.substring(8)), Long.parseLong(secondShardId.substring(8)));
- }
-
- /**
- * Checks if a shard id has valid format.
- * Kinesis stream shard ids have 12-digit numbers left-padded with 0's,
- * prefixed with "shardId-", ex. "shardId-000000000015".
- *
- * @param shardId the shard id to check
- * @return whether the shard id is valid
- */
- public static boolean isValidShardId(String shardId) {
- if (shardId == null) { return false; }
- return shardId.matches("^shardId-\\d{12}");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
deleted file mode 100644
index 00181da..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-/**
- * A wrapper class that bundles a {@link KinesisStreamShard} with its last processed sequence number.
- */
-public class KinesisStreamShardState {
-
- private KinesisStreamShard kinesisStreamShard;
- private SequenceNumber lastProcessedSequenceNum;
-
- public KinesisStreamShardState(KinesisStreamShard kinesisStreamShard, SequenceNumber lastProcessedSequenceNum) {
- this.kinesisStreamShard = kinesisStreamShard;
- this.lastProcessedSequenceNum = lastProcessedSequenceNum;
- }
-
- public KinesisStreamShard getKinesisStreamShard() {
- return this.kinesisStreamShard;
- }
-
- public SequenceNumber getLastProcessedSequenceNum() {
- return this.lastProcessedSequenceNum;
- }
-
- public void setLastProcessedSequenceNum(SequenceNumber update) {
- this.lastProcessedSequenceNum = update;
- }
-
- @Override
- public String toString() {
- return "KinesisStreamShardState{" +
- "kinesisStreamShard='" + kinesisStreamShard.toString() + "'" +
- ", lastProcessedSequenceNumber='" + lastProcessedSequenceNum.toString() + "'}";
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof KinesisStreamShardState)) {
- return false;
- }
-
- if (obj == this) {
- return true;
- }
-
- KinesisStreamShardState other = (KinesisStreamShardState) obj;
-
- return kinesisStreamShard.equals(other.getKinesisStreamShard()) && lastProcessedSequenceNum.equals(other.getLastProcessedSequenceNum());
- }
-
- @Override
- public int hashCode() {
- return 37 * (kinesisStreamShard.hashCode() + lastProcessedSequenceNum.hashCode());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
deleted file mode 100644
index 8182201..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
-
-/**
- * Special flag values for sequence numbers in shards to indicate special positions.
- * The value is initially set by {@link FlinkKinesisConsumer} when {@link KinesisDataFetcher}s are created.
- * The KinesisDataFetchers will use this value to determine how to retrieve the starting shard iterator from AWS Kinesis.
- */
-public enum SentinelSequenceNumber {
-
- /** Flag value for shard's sequence numbers to indicate that the
- * shard should start to be read from the latest incoming records */
- SENTINEL_LATEST_SEQUENCE_NUM( new SequenceNumber("LATEST_SEQUENCE_NUM") ),
-
- /** Flag value for shard's sequence numbers to indicate that the shard should
- * start to be read from the earliest records that haven't expired yet */
- SENTINEL_EARLIEST_SEQUENCE_NUM( new SequenceNumber("EARLIEST_SEQUENCE_NUM") ),
-
- /** Flag value to indicate that we have already read the last record of this shard
- * (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record) */
- SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") );
-
- private SequenceNumber sentinel;
-
- SentinelSequenceNumber(SequenceNumber sentinel) {
- this.sentinel = sentinel;
- }
-
- public SequenceNumber get() {
- return sentinel;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
deleted file mode 100644
index 021f53f..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.model;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A serializable representation of a Kinesis record's sequence number. It has two fields: the main sequence number,
- * and also a subsequence number. If this {@link SequenceNumber} is referring to an aggregated Kinesis record, the
- * subsequence number will be a non-negative value representing the order of the sub-record within the aggregation.
- */
-public class SequenceNumber implements Serializable {
-
- private static final long serialVersionUID = 876972197938972667L;
-
- private static final String DELIMITER = "-";
-
- private final String sequenceNumber;
- private final long subSequenceNumber;
-
- private final int cachedHash;
-
- /**
- * Create a new instance for a non-aggregated Kinesis record without a subsequence number.
- * @param sequenceNumber the sequence number
- */
- public SequenceNumber(String sequenceNumber) {
- this(sequenceNumber, -1);
- }
-
- /**
- * Create a new instance, with the specified sequence number and subsequence number.
- * To represent the sequence number for a non-aggregated Kinesis record, the subsequence number should be -1.
- * Otherwise, give a non-negative sequence number to represent an aggregated Kinesis record.
- *
- * @param sequenceNumber the sequence number
- * @param subSequenceNumber the subsequence number (-1 to represent non-aggregated Kinesis records)
- */
- public SequenceNumber(String sequenceNumber, long subSequenceNumber) {
- this.sequenceNumber = checkNotNull(sequenceNumber);
- this.subSequenceNumber = subSequenceNumber;
-
- this.cachedHash = 37 * (sequenceNumber.hashCode() + Long.valueOf(subSequenceNumber).hashCode());
- }
-
- public boolean isAggregated() {
- return subSequenceNumber >= 0;
- }
-
- public String getSequenceNumber() {
- return sequenceNumber;
- }
-
- public long getSubSequenceNumber() {
- return subSequenceNumber;
- }
-
- @Override
- public String toString() {
- if (isAggregated()) {
- return sequenceNumber + DELIMITER + subSequenceNumber;
- } else {
- return sequenceNumber;
- }
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof SequenceNumber)) {
- return false;
- }
-
- if (obj == this) {
- return true;
- }
-
- SequenceNumber other = (SequenceNumber) obj;
-
- return sequenceNumber.equals(other.getSequenceNumber())
- && (subSequenceNumber == other.getSubSequenceNumber());
- }
-
- @Override
- public int hashCode() {
- return cachedHash;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
deleted file mode 100644
index 04b1654..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.proxy;
-
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Basic model class to bundle the shards retrieved from Kinesis on a {@link KinesisProxyInterface#getShardList(Map)} call.
- */
-public class GetShardListResult {
-
- private final Map<String, LinkedList<KinesisStreamShard>> streamsToRetrievedShardList = new HashMap<>();
-
- public void addRetrievedShardToStream(String stream, KinesisStreamShard retrievedShard) {
- if (!streamsToRetrievedShardList.containsKey(stream)) {
- streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
- }
- streamsToRetrievedShardList.get(stream).add(retrievedShard);
- }
-
- public void addRetrievedShardsToStream(String stream, List<KinesisStreamShard> retrievedShards) {
- if (retrievedShards.size() != 0) {
- if (!streamsToRetrievedShardList.containsKey(stream)) {
- streamsToRetrievedShardList.put(stream, new LinkedList<KinesisStreamShard>());
- }
- streamsToRetrievedShardList.get(stream).addAll(retrievedShards);
- }
- }
-
- public List<KinesisStreamShard> getRetrievedShardListOfStream(String stream) {
- if (!streamsToRetrievedShardList.containsKey(stream)) {
- return null;
- } else {
- return streamsToRetrievedShardList.get(stream);
- }
- }
-
- public KinesisStreamShard getLastSeenShardOfStream(String stream) {
- if (!streamsToRetrievedShardList.containsKey(stream)) {
- return null;
- } else {
- return streamsToRetrievedShardList.get(stream).getLast();
- }
- }
-
- public boolean hasRetrievedShards() {
- return !streamsToRetrievedShardList.isEmpty();
- }
-
- public Set<String> getStreamsWithRetrievedShards() {
- return streamsToRetrievedShardList.keySet();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
deleted file mode 100644
index 9ffc8e6..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.proxy;
-
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
-import com.amazonaws.services.kinesis.model.GetRecordsRequest;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
-import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
-import com.amazonaws.services.kinesis.model.LimitExceededException;
-import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
-import com.amazonaws.services.kinesis.model.StreamStatus;
-import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.Map;
-import java.util.Random;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Kinesis proxy implementation - a utility class that is used as a proxy to make
- * calls to AWS Kinesis for several functions, such as getting a list of shards and
- * fetching a batch of data records starting from a specified record sequence number.
- *
- * NOTE:
- * In the AWS KCL library, there is a similar implementation - {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
- * This implementation differs mainly in that we can make operations to arbitrary Kinesis streams, which is a needed
- * functionality for the Flink Kinesis Connecter since the consumer may simultaneously read from multiple Kinesis streams.
- */
-public class KinesisProxy implements KinesisProxyInterface {
-
- private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);
-
- /** The actual Kinesis client from the AWS SDK that we will be using to make calls */
- private final AmazonKinesisClient kinesisClient;
-
- /** Random seed used to calculate backoff jitter for Kinesis operations */
- private final static Random seed = new Random();
-
- // ------------------------------------------------------------------------
- // describeStream() related performance settings
- // ------------------------------------------------------------------------
-
- /** Base backoff millis for the describe stream operation */
- private final long describeStreamBaseBackoffMillis;
-
- /** Maximum backoff millis for the describe stream operation */
- private final long describeStreamMaxBackoffMillis;
-
- /** Exponential backoff power constant for the describe stream operation */
- private final double describeStreamExpConstant;
-
- // ------------------------------------------------------------------------
- // getRecords() related performance settings
- // ------------------------------------------------------------------------
-
- /** Base backoff millis for the get records operation */
- private final long getRecordsBaseBackoffMillis;
-
- /** Maximum backoff millis for the get records operation */
- private final long getRecordsMaxBackoffMillis;
-
- /** Exponential backoff power constant for the get records operation */
- private final double getRecordsExpConstant;
-
- /** Maximum attempts for the get records operation */
- private final int getRecordsMaxAttempts;
-
- // ------------------------------------------------------------------------
- // getShardIterator() related performance settings
- // ------------------------------------------------------------------------
-
- /** Base backoff millis for the get shard iterator operation */
- private final long getShardIteratorBaseBackoffMillis;
-
- /** Maximum backoff millis for the get shard iterator operation */
- private final long getShardIteratorMaxBackoffMillis;
-
- /** Exponential backoff power constant for the get shard iterator operation */
- private final double getShardIteratorExpConstant;
-
- /** Maximum attempts for the get shard iterator operation */
- private final int getShardIteratorMaxAttempts;
-
- /**
- * Create a new KinesisProxy based on the supplied configuration properties
- *
- * @param configProps configuration properties containing AWS credential and AWS region info
- */
- private KinesisProxy(Properties configProps) {
- checkNotNull(configProps);
-
- this.kinesisClient = AWSUtil.createKinesisClient(configProps);
-
- this.describeStreamBaseBackoffMillis = Long.valueOf(
- configProps.getProperty(
- ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
- Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
- this.describeStreamMaxBackoffMillis = Long.valueOf(
- configProps.getProperty(
- ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
- Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
- this.describeStreamExpConstant = Double.valueOf(
- configProps.getProperty(
- ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
- Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
-
- this.getRecordsBaseBackoffMillis = Long.valueOf(
- configProps.getProperty(
- ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
- Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE)));
- this.getRecordsMaxBackoffMillis = Long.valueOf(
- configProps.getProperty(
- ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
- Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX)));
- this.getRecordsExpConstant = Double.valueOf(
- configProps.getProperty(
- ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
- Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
- this.getRecordsMaxAttempts = Integer.valueOf(
- configProps.getProperty(
- ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
- Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
-
- this.getShardIteratorBaseBackoffMillis = Long.valueOf(
- configProps.getProperty(
- ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
- Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE)));
- this.getShardIteratorMaxBackoffMillis = Long.valueOf(
- configProps.getProperty(
- ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
- Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX)));
- this.getShardIteratorExpConstant = Double.valueOf(
- configProps.getProperty(
- ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
- Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
- this.getShardIteratorMaxAttempts = Integer.valueOf(
- configProps.getProperty(
- ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
- Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
-
- }
-
- /**
- * Creates a Kinesis proxy.
- *
- * @param configProps configuration properties
- * @return the created kinesis proxy
- */
- public static KinesisProxyInterface create(Properties configProps) {
- return new KinesisProxy(configProps);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
- final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
- getRecordsRequest.setShardIterator(shardIterator);
- getRecordsRequest.setLimit(maxRecordsToGet);
-
- GetRecordsResult getRecordsResult = null;
-
- int attempt = 0;
- while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
- try {
- getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
- } catch (ProvisionedThroughputExceededException ex) {
- long backoffMillis = fullJitterBackoff(
- getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
- LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
- + backoffMillis + " millis.");
- Thread.sleep(backoffMillis);
- }
- }
-
- if (getRecordsResult == null) {
- throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
- " retry attempts returned ProvisionedThroughputExceededException.");
- }
-
- return getRecordsResult;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
- GetShardListResult result = new GetShardListResult();
-
- for (Map.Entry<String,String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) {
- String stream = streamNameWithLastSeenShardId.getKey();
- String lastSeenShardId = streamNameWithLastSeenShardId.getValue();
- result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId));
- }
- return result;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException {
- GetShardIteratorResult getShardIteratorResult = null;
-
- int attempt = 0;
- while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
- try {
- getShardIteratorResult =
- kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
- } catch (ProvisionedThroughputExceededException ex) {
- long backoffMillis = fullJitterBackoff(
- getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
- LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
- + backoffMillis + " millis.");
- Thread.sleep(backoffMillis);
- }
- }
-
- if (getShardIteratorResult == null) {
- throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts +
- " retry attempts returned ProvisionedThroughputExceededException.");
- }
- return getShardIteratorResult.getShardIterator();
- }
-
- private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
- List<KinesisStreamShard> shardsOfStream = new ArrayList<>();
-
- DescribeStreamResult describeStreamResult;
- do {
- describeStreamResult = describeStream(streamName, lastSeenShardId);
-
- List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
- for (Shard shard : shards) {
- shardsOfStream.add(new KinesisStreamShard(streamName, shard));
- }
-
- if (shards.size() != 0) {
- lastSeenShardId = shards.get(shards.size() - 1).getShardId();
- }
- } while (describeStreamResult.getStreamDescription().isHasMoreShards());
-
- return shardsOfStream;
- }
-
- /**
- * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess.
- *
- * This method is using a "full jitter" approach described in AWS's article,
- * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>.
- * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This
- * jitter backoff approach will help distribute calls across the fetchers over time.
- *
- * @param streamName the stream to describe
- * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
- * @return the result of the describe stream operation
- */
- private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
- final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
- describeStreamRequest.setStreamName(streamName);
- describeStreamRequest.setExclusiveStartShardId(startShardId);
-
- DescribeStreamResult describeStreamResult = null;
-
- // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
- int attemptCount = 0;
- while (describeStreamResult == null) { // retry until we get a result
- try {
- describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
- } catch (LimitExceededException le) {
- long backoffMillis = fullJitterBackoff(
- describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
- LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
- + backoffMillis + " millis.");
- Thread.sleep(backoffMillis);
- } catch (ResourceNotFoundException re) {
- throw new RuntimeException("Error while getting stream details", re);
- }
- }
-
- String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
- if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
- "describeStream operation will not contain any shard information.");
- }
- }
-
- // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
- // start shard id in the returned shards list; check if we need to remove these erroneously returned shards
- if (startShardId != null) {
- List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
- Iterator<Shard> shardItr = shards.iterator();
- while (shardItr.hasNext()) {
- if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
- shardItr.remove();
- }
- }
- }
-
- return describeStreamResult;
- }
-
- private static long fullJitterBackoff(long base, long max, double power, int attempt) {
- long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt));
- return (long)(seed.nextDouble()*exponentialBackoff); // random jitter between 0 and the exponential backoff
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
deleted file mode 100644
index 39ddc52..0000000
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kinesis.proxy;
-
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-
-import java.util.Map;
-
-/**
- * Interface for a Kinesis proxy that operates on multiple Kinesis streams within the same AWS service region.
- */
-public interface KinesisProxyInterface {
-
- /**
- * Get a shard iterator from the specified position in a shard.
- * The retrieved shard iterator can be used in {@link KinesisProxyInterface#getRecords(String, int)}}
- * to read data from the Kinesis shard.
- *
- * @param shard the shard to get the iterator
- * @param shardIteratorType the iterator type, defining how the shard is to be iterated
- * (one of: TRIM_HORIZON, LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER)
- * @param startingSeqNum sequence number, must be null if shardIteratorType is TRIM_HORIZON or LATEST
- * @return shard iterator which can be used to read data from Kinesis
- * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
- * operation has exceeded the rate limit; this exception will be thrown
- * if the backoff is interrupted.
- */
- String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) throws InterruptedException;
-
- /**
- * Get the next batch of data records using a specific shard iterator
- *
- * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading
- * @param maxRecordsToGet the maximum amount of records to retrieve for this batch
- * @return the batch of retrieved records, also with a shard iterator that can be used to get the next batch
- * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
- * operation has exceeded the rate limit; this exception will be thrown
- * if the backoff is interrupted.
- */
- GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException;
-
- /**
- * Get shard list of multiple Kinesis streams, ignoring the
- * shards of each stream before a specified last seen shard id.
- *
- * @param streamNamesWithLastSeenShardIds a map with stream as key, and last seen shard id as value
- * @return result of the shard list query
- * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the
- * operation has exceeded the rate limit; this exception will be thrown
- * if the backoff is interrupted.
- */
- GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException;
-}