You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:44 UTC
[14/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-connector-kinesis
[FLINK-6711] Activate strict checkstyle for flink-connector-kinesis
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b12de1ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b12de1ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b12de1ed
Branch: refs/heads/master
Commit: b12de1ed546fc902922797ef647af97f763a903a
Parents: 28e8043
Author: zentol <ch...@apache.org>
Authored: Wed May 24 23:56:16 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:19 2017 +0200
----------------------------------------------------------------------
.../kinesis/FlinkKinesisConsumer.java | 15 ++---
.../kinesis/FlinkKinesisProducer.java | 31 +++++-----
.../connectors/kinesis/KinesisPartitioner.java | 11 +++-
.../kinesis/config/AWSConfigConstants.java | 26 ++++-----
.../kinesis/config/ConsumerConfigConstants.java | 45 +++++++--------
.../kinesis/config/ProducerConfigConstants.java | 2 +-
.../kinesis/examples/ConsumeFromKinesis.java | 3 +-
.../kinesis/examples/ProduceIntoKinesis.java | 11 +++-
.../kinesis/internals/KinesisDataFetcher.java | 48 ++++++++--------
.../kinesis/internals/ShardConsumer.java | 23 ++++----
.../kinesis/model/KinesisStreamShard.java | 10 ++--
.../kinesis/model/KinesisStreamShardState.java | 3 +-
.../kinesis/model/SentinelSequenceNumber.java | 16 +++---
.../kinesis/model/StreamShardHandle.java | 8 ++-
.../kinesis/proxy/GetShardListResult.java | 2 +-
.../connectors/kinesis/proxy/KinesisProxy.java | 56 ++++++++++---------
.../kinesis/proxy/KinesisProxyInterface.java | 7 ++-
.../KinesisDeserializationSchema.java | 2 +-
.../KinesisDeserializationSchemaWrapper.java | 2 +-
.../KinesisSerializationSchema.java | 4 +-
.../connectors/kinesis/util/AWSUtil.java | 9 +--
.../kinesis/util/KinesisConfigUtil.java | 5 +-
.../FlinkKinesisConsumerMigrationTest.java | 18 +++---
.../kinesis/FlinkKinesisConsumerTest.java | 19 ++++---
.../internals/KinesisDataFetcherTest.java | 59 +++++++++++---------
.../kinesis/internals/ShardConsumerTest.java | 12 ++--
.../manualtests/ManualConsumerProducerTest.java | 16 +++---
.../manualtests/ManualExactlyOnceTest.java | 14 +++--
...nualExactlyOnceWithStreamReshardingTest.java | 23 ++++----
.../kinesis/manualtests/ManualProducerTest.java | 9 +--
.../kinesis/proxy/KinesisProxyTest.java | 9 ++-
.../ExactlyOnceValidatingConsumerThread.java | 15 ++---
.../testutils/FakeKinesisBehavioursFactory.java | 37 ++++++------
.../KinesisEventsGeneratorProducerThread.java | 7 ++-
.../testutils/KinesisShardIdGenerator.java | 6 +-
.../testutils/TestableFlinkKinesisConsumer.java | 4 ++
.../testutils/TestableKinesisDataFetcher.java | 21 ++++---
37 files changed, 330 insertions(+), 278 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index ea76ccc..d127f2b 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -44,13 +44,14 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeseri
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Map;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -82,24 +83,24 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
// Consumer properties
// ------------------------------------------------------------------------
- /** The names of the Kinesis streams that we will be consuming from */
+ /** The names of the Kinesis streams that we will be consuming from. */
private final List<String> streams;
/** Properties to parametrize settings such as AWS service region, initial position in stream,
- * shard list retrieval behaviours, etc */
+ * shard list retrieval behaviours, etc. */
private final Properties configProps;
- /** User supplied deserialization schema to convert Kinesis byte messages to Flink objects */
+ /** User supplied deserialization schema to convert Kinesis byte messages to Flink objects. */
private final KinesisDeserializationSchema<T> deserializer;
// ------------------------------------------------------------------------
// Runtime state
// ------------------------------------------------------------------------
- /** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards */
+ /** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards. */
private transient KinesisDataFetcher<T> fetcher;
- /** The sequence numbers to restore to upon restore from failure */
+ /** The sequence numbers to restore to upon restore from failure. */
private transient HashMap<StreamShardMetadata, SequenceNumber> sequenceNumsToRestore;
private volatile boolean running = true;
@@ -108,7 +109,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
// State for Checkpoint
// ------------------------------------------------------------------------
- /** State name to access shard sequence number states; cannot be changed */
+ /** State name to access shard sequence number states; cannot be changed. */
private static final String sequenceNumsStateStoreName = "Kinesis-Stream-Shard-State";
private transient ListState<Tuple2<StreamShardMetadata, SequenceNumber>> sequenceNumsStateForCheckpoint;
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index 579bd6b..04d7055 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -14,16 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kinesis;
-import com.amazonaws.services.kinesis.producer.Attempt;
-import com.amazonaws.services.kinesis.producer.KinesisProducer;
-import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
-import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
-import com.amazonaws.services.kinesis.producer.UserRecordResult;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -33,6 +26,15 @@ import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.PropertiesUtil;
+
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,10 +72,8 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
/* Optional custom partitioner */
private KinesisPartitioner<OUT> customPartitioner = null;
-
// --------------------------- Runtime fields ---------------------------
-
/* Our Kinesis instance for each parallel Flink sink */
private transient KinesisProducer producer;
@@ -83,10 +83,8 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
/* Field for async exception */
private transient volatile Throwable thrownException;
-
// --------------------------- Initialization and configuration ---------------------------
-
/**
* Create a new FlinkKinesisProducer.
* This is a constructor supporting Flink's {@see SerializationSchema}.
@@ -104,6 +102,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
return ByteBuffer.wrap(schema.serialize(element));
}
// use default stream and hash key
+
@Override
public String getTargetStream(OUT element) {
return null;
@@ -147,7 +146,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
}
/**
- * Set default partition id
+ * Set default partition id.
* @param defaultPartition Name of the default partition
*/
public void setDefaultPartition(String defaultPartition) {
@@ -160,10 +159,8 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
this.customPartitioner = partitioner;
}
-
// --------------------------- Lifecycle methods ---------------------------
-
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
@@ -186,7 +183,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
@Override
public void onSuccess(UserRecordResult result) {
if (!result.isSuccessful()) {
- if(failOnError) {
+ if (failOnError) {
thrownException = new RuntimeException("Record was not sent successful");
} else {
LOG.warn("Record was not sent successful");
@@ -222,7 +219,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
for (Attempt attempt: attempts) {
if (attempt.getErrorMessage() != null) {
- errorMessages += attempt.getErrorMessage() +"\n";
+ errorMessages += attempt.getErrorMessage() + "\n";
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
index bd23abe..6af01c9 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
@@ -14,22 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kinesis;
+package org.apache.flink.streaming.connectors.kinesis;
import java.io.Serializable;
+/**
+ * An interface for partitioning records.
+ *
+ * @param <T> record type
+ */
public abstract class KinesisPartitioner<T> implements Serializable {
/**
- * Return a partition id based on the input
+ * Return a partition id based on the input.
* @param element Element to partition
* @return A string representing the partition id
*/
public abstract String getPartitionId(T element);
/**
- * Optional method for setting an explicit hash key
+ * Optional method for setting an explicit hash key.
* @param element Element to get the hash key for
* @return the hash key for the element
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
index 01d4f00..eb14fc0 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.config;
import com.amazonaws.auth.AWSCredentialsProvider;
/**
- * Configuration keys for AWS service usage
+ * Configuration keys for AWS service usage.
*/
public class AWSConfigConstants {
@@ -30,41 +30,41 @@ public class AWSConfigConstants {
*/
public enum CredentialProvider {
- /** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create AWS credentials */
+ /** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create AWS credentials. */
ENV_VAR,
- /** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials */
+ /** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials. */
SYS_PROP,
- /** Use a AWS credentials profile file to create the AWS credentials */
+ /** Use a AWS credentials profile file to create the AWS credentials. */
PROFILE,
- /** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties */
+ /** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties. */
BASIC,
- /** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata **/
+ /** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata. **/
AUTO,
}
- /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */
+ /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */
public static final String AWS_REGION = "aws.region";
- /** The AWS access key ID to use when setting credentials provider type to BASIC */
+ /** The AWS access key ID to use when setting credentials provider type to BASIC. */
public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
- /** The AWS secret key to use when setting credentials provider type to BASIC */
+ /** The AWS secret key to use when setting credentials provider type to BASIC. */
public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
- /** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/
+ /** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
- /** Optional configuration for profile path if credential provider type is set to be PROFILE */
+ /** Optional configuration for profile path if credential provider type is set to be PROFILE. */
public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
- /** Optional configuration for profile name if credential provider type is set to be PROFILE */
+ /** Optional configuration for profile name if credential provider type is set to be PROFILE. */
public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
- /** The AWS endpoint for Kinesis (derived from the AWS region setting if not set) */
+ /** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */
public static final String AWS_ENDPOINT = "aws.endpoint";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 7c31af4..8362776 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -17,13 +17,14 @@
package org.apache.flink.streaming.connectors.kinesis.config;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
/**
- * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer}
+ * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer}.
*/
public class ConsumerConfigConstants extends AWSConfigConstants {
@@ -33,13 +34,13 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
*/
public enum InitialPosition {
- /** Start reading from the earliest possible record in the stream (excluding expired data records) */
+ /** Start reading from the earliest possible record in the stream (excluding expired data records). */
TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
- /** Start reading from the latest incoming record */
+ /** Start reading from the latest incoming record. */
LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM),
- /** Start reading from the record at the specified timestamp */
+ /** Start reading from the record at the specified timestamp. */
AT_TIMESTAMP(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM);
private SentinelSequenceNumber sentinelSequenceNumber;
@@ -53,55 +54,55 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
}
}
- /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */
+ /** The initial position to start reading Kinesis streams from (LATEST is used if not set). */
public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
- /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */
+ /** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";
- /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */
+ /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
- /** The base backoff time between each describeStream attempt */
+ /** The base backoff time between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
- /** The maximum backoff time between each describeStream attempt */
+ /** The maximum backoff time between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";
- /** The power constant for exponential backoff between each describeStream attempt */
+ /** The power constant for exponential backoff between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";
- /** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard */
+ /** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
- /** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException */
+ /** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException. */
public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";
- /** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
+ /** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base";
- /** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
+ /** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max";
- /** The power constant for exponential backoff between each getRecords attempt */
+ /** The power constant for exponential backoff between each getRecords attempt. */
public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst";
- /** The interval between each getRecords request to a AWS Kinesis shard in milliseconds */
+ /** The interval between each getRecords request to a AWS Kinesis shard in milliseconds. */
public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";
- /** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException */
+ /** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException. */
public static final String SHARD_GETITERATOR_RETRIES = "flink.shard.getiterator.maxretries";
- /** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
+ /** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException. */
public static final String SHARD_GETITERATOR_BACKOFF_BASE = "flink.shard.getiterator.backoff.base";
- /** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
+ /** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException. */
public static final String SHARD_GETITERATOR_BACKOFF_MAX = "flink.shard.getiterator.backoff.max";
- /** The power constant for exponential backoff between each getShardIterator attempt */
+ /** The power constant for exponential backoff between each getShardIterator attempt. */
public static final String SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getiterator.backoff.expconst";
- /** The interval between each attempt to discover new shards */
+ /** The interval between each attempt to discover new shards. */
public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis";
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
index 1edddfc..d131150 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.config;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
/**
- * Optional producer specific configuration keys for {@link FlinkKinesisProducer}
+ * Optional producer specific configuration keys for {@link FlinkKinesisProducer}.
*/
public class ProducerConfigConstants extends AWSConfigConstants {
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
index 55668c6..b1ac057 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kinesis.examples;
import org.apache.flink.api.java.utils.ParameterTool;
@@ -26,7 +27,7 @@ import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
/**
- * This is an example on how to consume data from Kinesis
+ * This is an example on how to consume data from Kinesis.
*/
public class ConsumeFromKinesis {
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
index d178137..ee031eb 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kinesis.examples;
-import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -25,10 +25,12 @@ import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.commons.lang3.RandomStringUtils;
+
import java.util.Properties;
/**
- * This is an example on how to produce data into Kinesis
+ * This is an example on how to produce data into Kinesis.
*/
public class ProduceIntoKinesis {
@@ -57,13 +59,16 @@ public class ProduceIntoKinesis {
see.execute();
}
+ /**
+ * Data generator that creates strings starting with a sequence number followed by a dash and 12 random characters.
+ */
public static class EventsGenerator implements SourceFunction<String> {
private boolean running = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
long seq = 0;
- while(running) {
+ while (running) {
Thread.sleep(10);
ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 11ac6d4..bbfbb20 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -17,30 +17,30 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.util.InstantiationUtil;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -77,10 +77,10 @@ public class KinesisDataFetcher<T> {
// Consumer-wide settings
// ------------------------------------------------------------------------
- /** Configuration properties for the Flink Kinesis Consumer */
+ /** Configuration properties for the Flink Kinesis Consumer. */
private final Properties configProps;
- /** The list of Kinesis streams that the consumer is subscribing to */
+ /** The list of Kinesis streams that the consumer is subscribing to. */
private final List<String> streams;
/**
@@ -94,7 +94,7 @@ public class KinesisDataFetcher<T> {
// Subtask-specific settings
// ------------------------------------------------------------------------
- /** Runtime context of the subtask that this fetcher was created in */
+ /** Runtime context of the subtask that this fetcher was created in. */
private final RuntimeContext runtimeContext;
private final int totalNumberOfConsumerSubtasks;
@@ -105,7 +105,7 @@ public class KinesisDataFetcher<T> {
// Executor services to run created threads
// ------------------------------------------------------------------------
- /** Executor service to run {@link ShardConsumer}s to consume Kinesis shards */
+ /** Executor service to run {@link ShardConsumer}s to consume Kinesis shards. */
private final ExecutorService shardConsumersExecutor;
// ------------------------------------------------------------------------
@@ -135,22 +135,22 @@ public class KinesisDataFetcher<T> {
private final SourceFunction.SourceContext<T> sourceContext;
- /** Checkpoint lock, also used to synchronize operations on subscribedShardsState */
+ /** Checkpoint lock, also used to synchronize operations on subscribedShardsState. */
private final Object checkpointLock;
- /** Reference to the first error thrown by any of the {@link ShardConsumer} threads */
+ /** Reference to the first error thrown by any of the {@link ShardConsumer} threads. */
private final AtomicReference<Throwable> error;
- /** The Kinesis proxy that the fetcher will be using to discover new shards */
+ /** The Kinesis proxy that the fetcher will be using to discover new shards. */
private final KinesisProxyInterface kinesis;
- /** Thread that executed runFetcher() */
+ /** Thread that executed runFetcher(). */
private volatile Thread mainThread;
/**
* The current number of shards that are actively read by this fetcher.
*
- * This value is updated in {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
+ * <p>This value is updated in {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
* and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}.
*/
private final AtomicInteger numberOfActiveShards = new AtomicInteger(0);
@@ -183,7 +183,7 @@ public class KinesisDataFetcher<T> {
KinesisProxy.create(configProps));
}
- /** This constructor is exposed for testing purposes */
+ /** This constructor is exposed for testing purposes. */
protected KinesisDataFetcher(List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
Object checkpointLock,
@@ -381,9 +381,9 @@ public class KinesisDataFetcher<T> {
shardConsumersExecutor.shutdownNow();
}
- /** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown */
+ /** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown. */
public void awaitTermination() throws InterruptedException {
- while(!shardConsumersExecutor.isTerminated()) {
+ while (!shardConsumersExecutor.isTerminated()) {
Thread.sleep(50);
}
}
@@ -400,7 +400,7 @@ public class KinesisDataFetcher<T> {
// Functions that update the subscribedStreamToLastDiscoveredShardIds state
// ------------------------------------------------------------------------
- /** Updates the last discovered shard of a subscribed stream; only updates if the update is valid */
+ /** Updates the last discovered shard of a subscribed stream; only updates if the update is valid. */
public void advanceLastDiscoveredShardOfStream(String stream, String shardId) {
String lastSeenShardIdOfStream = this.subscribedStreamsToLastDiscoveredShardIds.get(stream);
@@ -417,7 +417,7 @@ public class KinesisDataFetcher<T> {
/**
* A utility function that does the following:
*
- * 1. Find new shards for each stream that we haven't seen before
+ * <p>1. Find new shards for each stream that we haven't seen before
* 2. For each new shard, determine whether this consumer subtask should subscribe to them;
* if yes, it is added to the returned list of shards
* 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards
@@ -538,7 +538,7 @@ public class KinesisDataFetcher<T> {
this.numberOfActiveShards.incrementAndGet();
}
- return subscribedShardsState.size()-1;
+ return subscribedShardsState.size() - 1;
}
}
@@ -574,7 +574,7 @@ public class KinesisDataFetcher<T> {
/**
* Utility function to create an initial map of the last discovered shard id of each subscribed stream, set to null;
- * This is called in the constructor; correct values will be set later on by calling advanceLastDiscoveredShardOfStream()
+ * This is called in the constructor; correct values will be set later on by calling advanceLastDiscoveredShardOfStream().
*
* @param streams the list of subscribed streams
* @return the initial map for subscribedStreamsToLastDiscoveredShardIds
@@ -588,7 +588,7 @@ public class KinesisDataFetcher<T> {
}
/**
- * Utility function to convert {@link StreamShardHandle} into {@link StreamShardMetadata}
+ * Utility function to convert {@link StreamShardHandle} into {@link StreamShardMetadata}.
*
* @param streamShardHandle the {@link StreamShardHandle} to be converted
* @return a {@link StreamShardMetadata} object
@@ -615,7 +615,7 @@ public class KinesisDataFetcher<T> {
}
/**
- * Utility function to convert {@link StreamShardMetadata} into {@link StreamShardHandle}
+ * Utility function to convert {@link StreamShardMetadata} into {@link StreamShardHandle}.
*
* @param streamShardMetadata the {@link StreamShardMetadata} to be converted
* @return a {@link StreamShardHandle} object
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index a724b49..2d48e5f 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -17,19 +17,20 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
-import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,7 +89,7 @@ public class ShardConsumer<T> implements Runnable {
KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
}
- /** This constructor is exposed for testing purposes */
+ /** This constructor is exposed for testing purposes. */
protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
Integer subscribedShardStateIndex,
StreamShardHandle subscribedShard,
@@ -186,7 +187,7 @@ public class ShardConsumer<T> implements Runnable {
}
}
- while(isRunning()) {
+ while (isRunning()) {
if (nextShardItr == null) {
fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
@@ -233,7 +234,7 @@ public class ShardConsumer<T> implements Runnable {
* {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard
* iterators if necessary.
*
- * Note that the server-side Kinesis timestamp is attached to the record when collected. When the
+ * <p>Note that the server-side Kinesis timestamp is attached to the record when collected. When the
* user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default.
*
* @param record record to deserialize and collect
@@ -275,7 +276,7 @@ public class ShardConsumer<T> implements Runnable {
* such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
* be used for the next call to this method.
*
- * Note: it is important that this method is not called again before all the records from the last result have been
+ * <p>Note: it is important that this method is not called again before all the records from the last result have been
* fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
* {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
* incorrect shard iteration if the iterator had to be refreshed.
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
index 592e30d..22bfbf5 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
@@ -37,7 +37,7 @@ public class KinesisStreamShard implements Serializable {
private final int cachedHash;
/**
- * Create a new KinesisStreamShard
+ * Create a new KinesisStreamShard.
*
* @param streamName
* the name of the Kinesis stream that this shard belongs to
@@ -96,7 +96,7 @@ public class KinesisStreamShard implements Serializable {
}
/**
- * Utility function to compare two shard ids
+ * Utility function to compare two shard ids.
*
* @param firstShardId first shard id to compare
* @param secondShardId second shard id to compare
@@ -126,7 +126,9 @@ public class KinesisStreamShard implements Serializable {
* @return whether the shard id is valid
*/
public static boolean isValidShardId(String shardId) {
- if (shardId == null) { return false; }
+ if (shardId == null) {
+ return false;
+ }
return shardId.matches("^shardId-\\d{12}");
}
@@ -148,7 +150,7 @@ public class KinesisStreamShard implements Serializable {
streamShardMetadata.setStartingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getStartingHashKey());
streamShardMetadata.setEndingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getEndingHashKey());
}
-
+
if (kinesisStreamShard.getShard().getSequenceNumberRange() != null) {
streamShardMetadata.setStartingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getStartingSequenceNumber());
streamShardMetadata.setEndingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getEndingSequenceNumber());
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
index 4b1cc1c..fbd2e47 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
@@ -17,9 +17,10 @@
package org.apache.flink.streaming.connectors.kinesis.model;
-import com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.util.Preconditions;
+import com.amazonaws.services.kinesis.model.Shard;
+
/**
* A wrapper class that bundles a {@link StreamShardHandle} with its last processed sequence number.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
index 7f9dbbb..a5398e4 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
@@ -28,20 +28,20 @@ import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetche
public enum SentinelSequenceNumber {
/** Flag value for shard's sequence numbers to indicate that the
- * shard should start to be read from the latest incoming records */
- SENTINEL_LATEST_SEQUENCE_NUM( new SequenceNumber("LATEST_SEQUENCE_NUM") ),
+ * shard should start to be read from the latest incoming records. */
+ SENTINEL_LATEST_SEQUENCE_NUM(new SequenceNumber("LATEST_SEQUENCE_NUM")),
/** Flag value for shard's sequence numbers to indicate that the shard should
- * start to be read from the earliest records that haven't expired yet */
- SENTINEL_EARLIEST_SEQUENCE_NUM( new SequenceNumber("EARLIEST_SEQUENCE_NUM") ),
+ * start to be read from the earliest records that haven't expired yet. */
+ SENTINEL_EARLIEST_SEQUENCE_NUM(new SequenceNumber("EARLIEST_SEQUENCE_NUM")),
/** Flag value for shard's sequence numbers to indicate that the shard should
- * start to be read from the specified timestamp */
- SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM( new SequenceNumber("AT_TIMESTAMP_SEQUENCE_NUM") ),
+ * start to be read from the specified timestamp. */
+ SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM(new SequenceNumber("AT_TIMESTAMP_SEQUENCE_NUM")),
/** Flag value to indicate that we have already read the last record of this shard
- * (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record) */
- SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") );
+ * (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record). */
+ SENTINEL_SHARD_ENDING_SEQUENCE_NUM(new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM"));
private SequenceNumber sentinel;
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
index d340a88..767c227 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
@@ -34,7 +34,7 @@ public class StreamShardHandle {
private final int cachedHash;
/**
- * Create a new StreamShardHandle
+ * Create a new StreamShardHandle.
*
* @param streamName
* the name of the Kinesis stream that this shard belongs to
@@ -93,7 +93,7 @@ public class StreamShardHandle {
}
/**
- * Utility function to compare two shard ids
+ * Utility function to compare two shard ids.
*
* @param firstShardId first shard id to compare
* @param secondShardId second shard id to compare
@@ -123,7 +123,9 @@ public class StreamShardHandle {
* @return whether the shard id is valid
*/
public static boolean isValidShardId(String shardId) {
- if (shardId == null) { return false; }
+ if (shardId == null) {
+ return false;
+ }
return shardId.matches("^shardId-\\d{12}");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
index aadb31c..fcfb3ac 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
@@ -19,9 +19,9 @@ package org.apache.flink.streaming.connectors.kinesis.proxy;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 70c1286..89e9f04 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -17,34 +17,36 @@
package org.apache.flink.streaming.connectors.kinesis.proxy;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
-import com.amazonaws.services.kinesis.model.StreamStatus;
import com.amazonaws.services.kinesis.model.Shard;
-import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import com.amazonaws.services.kinesis.model.StreamStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+
import java.util.ArrayList;
+import java.util.Date;
import java.util.Iterator;
import java.util.List;
-import java.util.Properties;
import java.util.Map;
+import java.util.Properties;
import java.util.Random;
-import java.util.Date;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -53,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* calls to AWS Kinesis for several functions, such as getting a list of shards and
* fetching a batch of data records starting from a specified record sequence number.
*
- * NOTE:
+ * <p>NOTE:
* In the AWS KCL library, there is a similar implementation - {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
* This implementation differs mainly in that we can make operations to arbitrary Kinesis streams, which is a needed
* functionality for the Flink Kinesis Connecter since the consumer may simultaneously read from multiple Kinesis streams.
@@ -62,59 +64,59 @@ public class KinesisProxy implements KinesisProxyInterface {
private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);
- /** The actual Kinesis client from the AWS SDK that we will be using to make calls */
+ /** The actual Kinesis client from the AWS SDK that we will be using to make calls. */
private final AmazonKinesisClient kinesisClient;
- /** Random seed used to calculate backoff jitter for Kinesis operations */
- private final static Random seed = new Random();
+ /** Random seed used to calculate backoff jitter for Kinesis operations. */
+ private static final Random seed = new Random();
// ------------------------------------------------------------------------
// describeStream() related performance settings
// ------------------------------------------------------------------------
- /** Base backoff millis for the describe stream operation */
+ /** Base backoff millis for the describe stream operation. */
private final long describeStreamBaseBackoffMillis;
- /** Maximum backoff millis for the describe stream operation */
+ /** Maximum backoff millis for the describe stream operation. */
private final long describeStreamMaxBackoffMillis;
- /** Exponential backoff power constant for the describe stream operation */
+ /** Exponential backoff power constant for the describe stream operation. */
private final double describeStreamExpConstant;
// ------------------------------------------------------------------------
// getRecords() related performance settings
// ------------------------------------------------------------------------
- /** Base backoff millis for the get records operation */
+ /** Base backoff millis for the get records operation. */
private final long getRecordsBaseBackoffMillis;
- /** Maximum backoff millis for the get records operation */
+ /** Maximum backoff millis for the get records operation. */
private final long getRecordsMaxBackoffMillis;
- /** Exponential backoff power constant for the get records operation */
+ /** Exponential backoff power constant for the get records operation. */
private final double getRecordsExpConstant;
- /** Maximum attempts for the get records operation */
+ /** Maximum attempts for the get records operation. */
private final int getRecordsMaxAttempts;
// ------------------------------------------------------------------------
// getShardIterator() related performance settings
// ------------------------------------------------------------------------
- /** Base backoff millis for the get shard iterator operation */
+ /** Base backoff millis for the get shard iterator operation. */
private final long getShardIteratorBaseBackoffMillis;
- /** Maximum backoff millis for the get shard iterator operation */
+ /** Maximum backoff millis for the get shard iterator operation. */
private final long getShardIteratorMaxBackoffMillis;
- /** Exponential backoff power constant for the get shard iterator operation */
+ /** Exponential backoff power constant for the get shard iterator operation. */
private final double getShardIteratorExpConstant;
- /** Maximum attempts for the get shard iterator operation */
+ /** Maximum attempts for the get shard iterator operation. */
private final int getShardIteratorMaxAttempts;
/**
- * Create a new KinesisProxy based on the supplied configuration properties
+ * Create a new KinesisProxy based on the supplied configuration properties.
*
* @param configProps configuration properties containing AWS credential and AWS region info
*/
@@ -225,7 +227,7 @@ public class KinesisProxy implements KinesisProxyInterface {
public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
GetShardListResult result = new GetShardListResult();
- for (Map.Entry<String,String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) {
+ for (Map.Entry<String, String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) {
String stream = streamNameWithLastSeenShardId.getKey();
String lastSeenShardId = streamNameWithLastSeenShardId.getValue();
result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId));
@@ -294,7 +296,7 @@ public class KinesisProxy implements KinesisProxyInterface {
/**
* Determines whether the exception is recoverable using exponential-backoff.
- *
+ *
* @param ex Exception to inspect
* @return <code>true</code> if the exception can be recovered from, else
* <code>false</code>
@@ -338,7 +340,7 @@ public class KinesisProxy implements KinesisProxyInterface {
/**
* Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess.
*
- * This method is using a "full jitter" approach described in AWS's article,
+ * <p>This method is using a "full jitter" approach described in AWS's article,
* <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>.
* This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This
* jitter backoff approach will help distribute calls across the fetchers over time.
@@ -395,6 +397,6 @@ public class KinesisProxy implements KinesisProxyInterface {
private static long fullJitterBackoff(long base, long max, double power, int attempt) {
long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt));
- return (long)(seed.nextDouble()*exponentialBackoff); // random jitter between 0 and the exponential backoff
+ return (long) (seed.nextDouble() * exponentialBackoff); // random jitter between 0 and the exponential backoff
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
index 807a163..0538151 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -17,9 +17,10 @@
package org.apache.flink.streaming.connectors.kinesis.proxy;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+
import java.util.Map;
/**
@@ -46,7 +47,7 @@ public interface KinesisProxyInterface {
String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) throws InterruptedException;
/**
- * Get the next batch of data records using a specific shard iterator
+ * Get the next batch of data records using a specific shard iterator.
*
* @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading
* @param maxRecordsToGet the maximum amount of records to retrieve for this batch
@@ -67,5 +68,5 @@ public interface KinesisProxyInterface {
* operation has exceeded the rate limit; this exception will be thrown
* if the backoff is interrupted.
*/
- GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException;
+ GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
index 0effdd8..b06b20f 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
@@ -33,7 +33,7 @@ import java.io.Serializable;
public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
- * Deserializes a Kinesis record's bytes
+ * Deserializes a Kinesis record's bytes.
*
* @param recordValue the record's value as a byte array
* @param partitionKey the record's partition key at the time of writing
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
index 6e66038..279d410 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.io.IOException;
/**
- * A simple wrapper for using the {@link DeserializationSchema} with the {@link KinesisDeserializationSchema} interface
+ * A simple wrapper for using the {@link DeserializationSchema} with the {@link KinesisDeserializationSchema} interface.
*
* @param <T> The type created by the deserialization schema.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
index 03dd72c..9be410a 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
@@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kinesis.serialization;
+package org.apache.flink.streaming.connectors.kinesis.serialization;
import java.io.Serializable;
import java.nio.ByteBuffer;
@@ -27,7 +27,7 @@ import java.nio.ByteBuffer;
*/
public interface KinesisSerializationSchema<T> extends Serializable {
/**
- * Serialize the given element into a ByteBuffer
+ * Serialize the given element into a ByteBuffer.
*
* @param element The element to serialize
* @return Serialized representation of the element
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index a6aad02..5670526 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -17,6 +17,10 @@
package org.apache.flink.streaming.connectors.kinesis.util;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.auth.AWSCredentials;
@@ -29,9 +33,6 @@ import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
import java.util.Properties;
@@ -126,7 +127,7 @@ public class AWSUtil {
}
/**
- * Checks whether or not a region ID is valid
+ * Checks whether or not a region ID is valid.
*
* @param region The AWS region ID to check
* @return true if the supplied region ID is valid, false otherwise
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 244f5a5..42f1af0 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.connectors.kinesis.util;
-import com.amazonaws.regions.Regions;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
@@ -26,6 +25,8 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
+import com.amazonaws.regions.Regions;
+
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Properties;
@@ -141,7 +142,7 @@ public class KinesisConfigUtil {
}
/**
- * Validate configuration properties related to Amazon AWS service
+ * Validate configuration properties related to Amazon AWS service.
*/
public static void validateAwsConfiguration(Properties config) {
if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index e24a411..af84420 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -17,19 +17,20 @@
package org.apache.flink.streaming.connectors.kinesis;
-import com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+
+import com.amazonaws.services.kinesis.model.Shard;
import org.junit.Test;
import java.net.URL;
@@ -37,8 +38,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Properties;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.mockito.Mockito.mock;
/**
@@ -136,11 +137,12 @@ public class FlinkKinesisConsumerMigrationTest {
}
@Override
- protected KinesisDataFetcher<T> createFetcher(List<String> streams,
- SourceFunction.SourceContext<T> sourceContext,
- RuntimeContext runtimeContext,
- Properties configProps,
- KinesisDeserializationSchema<T> deserializationSchema) {
+ protected KinesisDataFetcher<T> createFetcher(
+ List<String> streams,
+ SourceFunction.SourceContext<T> sourceContext,
+ RuntimeContext runtimeContext,
+ Properties configProps,
+ KinesisDeserializationSchema<T> deserializationSchema) {
return mock(KinesisDataFetcher.class);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 186dfa6..a26e758 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -17,9 +17,6 @@
package org.apache.flink.streaming.connectors.kinesis;
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
@@ -38,15 +35,19 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -59,21 +60,21 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.util.HashMap;
-import java.util.Map;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
-import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.never;
/**
* Suite of FlinkKinesisConsumer tests for the methods called throughout the source life cycle.
http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 4fb6dd4..2e1adb6 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -17,21 +17,22 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
@@ -40,9 +41,9 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.HashMap;
-import java.util.Map;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
@@ -54,6 +55,9 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+/**
+ * Tests for the {@link KinesisDataFetcher}.
+ */
@RunWith(PowerMockRunner.class)
@PrepareForTest(TestableKinesisDataFetcher.class)
public class KinesisDataFetcherTest {
@@ -92,10 +96,10 @@ public class KinesisDataFetcherTest {
HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
- Map<String,Integer> streamToShardCount = new HashMap<>();
+ Map<String, Integer> streamToShardCount = new HashMap<>();
Random rand = new Random();
for (String fakeStream : fakeStreams) {
- streamToShardCount.put(fakeStream, rand.nextInt(5)+1);
+ streamToShardCount.put(fakeStream, rand.nextInt(5) + 1);
}
final TestableKinesisDataFetcher fetcher =
@@ -140,10 +144,10 @@ public class KinesisDataFetcherTest {
assertTrue(streamsInState.containsAll(fakeStreams));
// assert that the last seen shards in state is correctly set
- for (Map.Entry<String,String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
+ for (Map.Entry<String, String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
assertTrue(
streamToLastSeenShard.getValue().equals(
- KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1)));
+ KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey()) - 1)));
}
}
@@ -184,7 +188,7 @@ public class KinesisDataFetcherTest {
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
- Map<String,Integer> streamToShardCount = new HashMap<>();
+ Map<String, Integer> streamToShardCount = new HashMap<>();
streamToShardCount.put("fakeStream1", 3); // fakeStream1 will still have 3 shards after restore
streamToShardCount.put("fakeStream2", 2); // fakeStream2 will still have 2 shards after restore
@@ -230,10 +234,10 @@ public class KinesisDataFetcherTest {
assertTrue(streamsInState.containsAll(fakeStreams));
// assert that the last seen shards in state is correctly set
- for (Map.Entry<String,String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
+ for (Map.Entry<String, String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
assertTrue(
streamToLastSeenShard.getValue().equals(
- KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1)));
+ KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey()) - 1)));
}
}
@@ -274,9 +278,9 @@ public class KinesisDataFetcherTest {
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
- Map<String,Integer> streamToShardCount = new HashMap<>();
- streamToShardCount.put("fakeStream1", 3+1); // fakeStream1 had 3 shards before & 1 new shard after restore
- streamToShardCount.put("fakeStream2", 2+3); // fakeStream2 had 2 shards before & 3 new shard after restore
+ Map<String, Integer> streamToShardCount = new HashMap<>();
+ streamToShardCount.put("fakeStream1", 3 + 1); // fakeStream1 had 3 shards before & 1 new shard after restore
+ streamToShardCount.put("fakeStream2", 2 + 3); // fakeStream2 had 2 shards before & 3 new shard after restore
HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
@@ -321,10 +325,10 @@ public class KinesisDataFetcherTest {
assertTrue(streamsInState.containsAll(fakeStreams));
// assert that the last seen shards in state is correctly set
- for (Map.Entry<String,String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
+ for (Map.Entry<String, String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
assertTrue(
streamToLastSeenShard.getValue().equals(
- KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1)));
+ KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey()) - 1)));
}
}
@@ -367,7 +371,7 @@ public class KinesisDataFetcherTest {
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
- Map<String,Integer> streamToShardCount = new HashMap<>();
+ Map<String, Integer> streamToShardCount = new HashMap<>();
streamToShardCount.put("fakeStream1", 3); // fakeStream1 has fixed 3 shards
streamToShardCount.put("fakeStream2", 2); // fakeStream2 has fixed 2 shards
streamToShardCount.put("fakeStream3", 0); // no shards can be found for fakeStream3
@@ -463,9 +467,9 @@ public class KinesisDataFetcherTest {
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
- Map<String,Integer> streamToShardCount = new HashMap<>();
- streamToShardCount.put("fakeStream1", 3+1); // fakeStream1 had 3 shards before & 1 new shard after restore
- streamToShardCount.put("fakeStream2", 2+3); // fakeStream2 had 2 shards before & 2 new shard after restore
+ Map<String, Integer> streamToShardCount = new HashMap<>();
+ streamToShardCount.put("fakeStream1", 3 + 1); // fakeStream1 had 3 shards before & 1 new shard after restore
+ streamToShardCount.put("fakeStream2", 2 + 3); // fakeStream2 had 2 shards before & 2 new shard after restore
streamToShardCount.put("fakeStream3", 0); // no shards can be found for fakeStream3
streamToShardCount.put("fakeStream4", 0); // no shards can be found for fakeStream4
@@ -569,11 +573,12 @@ public class KinesisDataFetcherTest {
}
@Override
- protected KinesisDataFetcher<T> createFetcher(List<String> streams,
- SourceFunction.SourceContext<T> sourceContext,
- RuntimeContext runtimeContext,
- Properties configProps,
- KinesisDeserializationSchema<T> deserializationSchema) {
+ protected KinesisDataFetcher<T> createFetcher(
+ List<String> streams,
+ SourceFunction.SourceContext<T> sourceContext,
+ RuntimeContext runtimeContext,
+ Properties configProps,
+ KinesisDeserializationSchema<T> deserializationSchema) {
return fetcher;
}