You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:44 UTC

[14/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-connector-kinesis

[FLINK-6711] Activate strict checkstyle for flink-connector-kinesis


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b12de1ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b12de1ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b12de1ed

Branch: refs/heads/master
Commit: b12de1ed546fc902922797ef647af97f763a903a
Parents: 28e8043
Author: zentol <ch...@apache.org>
Authored: Wed May 24 23:56:16 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:19 2017 +0200

----------------------------------------------------------------------
 .../kinesis/FlinkKinesisConsumer.java           | 15 ++---
 .../kinesis/FlinkKinesisProducer.java           | 31 +++++-----
 .../connectors/kinesis/KinesisPartitioner.java  | 11 +++-
 .../kinesis/config/AWSConfigConstants.java      | 26 ++++-----
 .../kinesis/config/ConsumerConfigConstants.java | 45 +++++++--------
 .../kinesis/config/ProducerConfigConstants.java |  2 +-
 .../kinesis/examples/ConsumeFromKinesis.java    |  3 +-
 .../kinesis/examples/ProduceIntoKinesis.java    | 11 +++-
 .../kinesis/internals/KinesisDataFetcher.java   | 48 ++++++++--------
 .../kinesis/internals/ShardConsumer.java        | 23 ++++----
 .../kinesis/model/KinesisStreamShard.java       | 10 ++--
 .../kinesis/model/KinesisStreamShardState.java  |  3 +-
 .../kinesis/model/SentinelSequenceNumber.java   | 16 +++---
 .../kinesis/model/StreamShardHandle.java        |  8 ++-
 .../kinesis/proxy/GetShardListResult.java       |  2 +-
 .../connectors/kinesis/proxy/KinesisProxy.java  | 56 ++++++++++---------
 .../kinesis/proxy/KinesisProxyInterface.java    |  7 ++-
 .../KinesisDeserializationSchema.java           |  2 +-
 .../KinesisDeserializationSchemaWrapper.java    |  2 +-
 .../KinesisSerializationSchema.java             |  4 +-
 .../connectors/kinesis/util/AWSUtil.java        |  9 +--
 .../kinesis/util/KinesisConfigUtil.java         |  5 +-
 .../FlinkKinesisConsumerMigrationTest.java      | 18 +++---
 .../kinesis/FlinkKinesisConsumerTest.java       | 19 ++++---
 .../internals/KinesisDataFetcherTest.java       | 59 +++++++++++---------
 .../kinesis/internals/ShardConsumerTest.java    | 12 ++--
 .../manualtests/ManualConsumerProducerTest.java | 16 +++---
 .../manualtests/ManualExactlyOnceTest.java      | 14 +++--
 ...nualExactlyOnceWithStreamReshardingTest.java | 23 ++++----
 .../kinesis/manualtests/ManualProducerTest.java |  9 +--
 .../kinesis/proxy/KinesisProxyTest.java         |  9 ++-
 .../ExactlyOnceValidatingConsumerThread.java    | 15 ++---
 .../testutils/FakeKinesisBehavioursFactory.java | 37 ++++++------
 .../KinesisEventsGeneratorProducerThread.java   |  7 ++-
 .../testutils/KinesisShardIdGenerator.java      |  6 +-
 .../testutils/TestableFlinkKinesisConsumer.java |  4 ++
 .../testutils/TestableKinesisDataFetcher.java   | 21 ++++---
 37 files changed, 330 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index ea76ccc..d127f2b 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -44,13 +44,14 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeseri
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -82,24 +83,24 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	//  Consumer properties
 	// ------------------------------------------------------------------------
 
-	/** The names of the Kinesis streams that we will be consuming from */
+	/** The names of the Kinesis streams that we will be consuming from. */
 	private final List<String> streams;
 
 	/** Properties to parametrize settings such as AWS service region, initial position in stream,
-	 * shard list retrieval behaviours, etc */
+	 * shard list retrieval behaviours, etc. */
 	private final Properties configProps;
 
-	/** User supplied deserialization schema to convert Kinesis byte messages to Flink objects */
+	/** User supplied deserialization schema to convert Kinesis byte messages to Flink objects. */
 	private final KinesisDeserializationSchema<T> deserializer;
 
 	// ------------------------------------------------------------------------
 	//  Runtime state
 	// ------------------------------------------------------------------------
 
-	/** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards */
+	/** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards. */
 	private transient KinesisDataFetcher<T> fetcher;
 
-	/** The sequence numbers to restore to upon restore from failure */
+	/** The sequence numbers to restore to upon restore from failure. */
 	private transient HashMap<StreamShardMetadata, SequenceNumber> sequenceNumsToRestore;
 
 	private volatile boolean running = true;
@@ -108,7 +109,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
 	//  State for Checkpoint
 	// ------------------------------------------------------------------------
 
-	/** State name to access shard sequence number states; cannot be changed */
+	/** State name to access shard sequence number states; cannot be changed. */
 	private static final String sequenceNumsStateStoreName = "Kinesis-Stream-Shard-State";
 
 	private transient ListState<Tuple2<StreamShardMetadata, SequenceNumber>> sequenceNumsStateForCheckpoint;

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index 579bd6b..04d7055 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -14,16 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kinesis;
 
-import com.amazonaws.services.kinesis.producer.Attempt;
-import com.amazonaws.services.kinesis.producer.KinesisProducer;
-import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
-import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
-import com.amazonaws.services.kinesis.producer.UserRecordResult;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -33,6 +26,15 @@ import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.PropertiesUtil;
+
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,10 +72,8 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 	/* Optional custom partitioner */
 	private KinesisPartitioner<OUT> customPartitioner = null;
 
-
 	// --------------------------- Runtime fields ---------------------------
 
-
 	/* Our Kinesis instance for each parallel Flink sink */
 	private transient KinesisProducer producer;
 
@@ -83,10 +83,8 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 	/* Field for async exception */
 	private transient volatile Throwable thrownException;
 
-
 	// --------------------------- Initialization and configuration  ---------------------------
 
-
 	/**
 	 * Create a new FlinkKinesisProducer.
 	 * This is a constructor supporting Flink's {@see SerializationSchema}.
@@ -104,6 +102,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 				return ByteBuffer.wrap(schema.serialize(element));
 			}
 			// use default stream and hash key
+
 			@Override
 			public String getTargetStream(OUT element) {
 				return null;
@@ -147,7 +146,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 	}
 
 	/**
-	 * Set default partition id
+	 * Set default partition id.
 	 * @param defaultPartition Name of the default partition
 	 */
 	public void setDefaultPartition(String defaultPartition) {
@@ -160,10 +159,8 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 		this.customPartitioner = partitioner;
 	}
 
-
 	// --------------------------- Lifecycle methods ---------------------------
 
-
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
@@ -186,7 +183,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 			@Override
 			public void onSuccess(UserRecordResult result) {
 				if (!result.isSuccessful()) {
-					if(failOnError) {
+					if (failOnError) {
 						thrownException = new RuntimeException("Record was not sent successful");
 					} else {
 						LOG.warn("Record was not sent successful");
@@ -222,7 +219,7 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
 				List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
 				for (Attempt attempt: attempts) {
 					if (attempt.getErrorMessage() != null) {
-						errorMessages += attempt.getErrorMessage() +"\n";
+						errorMessages += attempt.getErrorMessage() + "\n";
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
index bd23abe..6af01c9 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java
@@ -14,22 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kinesis;
 
+package org.apache.flink.streaming.connectors.kinesis;
 
 import java.io.Serializable;
 
+/**
+ * An interface for partitioning records.
+ *
+ * @param <T> record type
+ */
 public abstract class KinesisPartitioner<T> implements Serializable {
 
 	/**
-	 * Return a partition id based on the input
+	 * Return a partition id based on the input.
 	 * @param element Element to partition
 	 * @return A string representing the partition id
 	 */
 	public abstract String getPartitionId(T element);
 
 	/**
-	 * Optional method for setting an explicit hash key
+	 * Optional method for setting an explicit hash key.
 	 * @param element Element to get the hash key for
 	 * @return the hash key for the element
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
index 01d4f00..eb14fc0 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.config;
 import com.amazonaws.auth.AWSCredentialsProvider;
 
 /**
- * Configuration keys for AWS service usage
+ * Configuration keys for AWS service usage.
  */
 public class AWSConfigConstants {
 
@@ -30,41 +30,41 @@ public class AWSConfigConstants {
 	 */
 	public enum CredentialProvider {
 
-		/** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create AWS credentials */
+		/** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create AWS credentials. */
 		ENV_VAR,
 
-		/** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials */
+		/** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials. */
 		SYS_PROP,
 
-		/** Use a AWS credentials profile file to create the AWS credentials */
+		/** Use a AWS credentials profile file to create the AWS credentials. */
 		PROFILE,
 
-		/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties */
+		/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties. */
 		BASIC,
 
-		/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata **/
+		/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata. **/
 		AUTO,
 	}
 
-	/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */
+	/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */
 	public static final String AWS_REGION = "aws.region";
 
-	/** The AWS access key ID to use when setting credentials provider type to BASIC */
+	/** The AWS access key ID to use when setting credentials provider type to BASIC. */
 	public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";
 
-	/** The AWS secret key to use when setting credentials provider type to BASIC */
+	/** The AWS secret key to use when setting credentials provider type to BASIC. */
 	public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";
 
-	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/
+	/** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
 	public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
 
-	/** Optional configuration for profile path if credential provider type is set to be PROFILE */
+	/** Optional configuration for profile path if credential provider type is set to be PROFILE. */
 	public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";
 
-	/** Optional configuration for profile name if credential provider type is set to be PROFILE */
+	/** Optional configuration for profile name if credential provider type is set to be PROFILE. */
 	public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";
 
-	/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set) */
+	/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */
 	public static final String AWS_ENDPOINT = "aws.endpoint";
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 7c31af4..8362776 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -17,13 +17,14 @@
 
 package org.apache.flink.streaming.connectors.kinesis.config;
 
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
 /**
- * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer}
+ * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer}.
  */
 public class ConsumerConfigConstants extends AWSConfigConstants {
 
@@ -33,13 +34,13 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 	 */
 	public enum InitialPosition {
 
-		/** Start reading from the earliest possible record in the stream (excluding expired data records) */
+		/** Start reading from the earliest possible record in the stream (excluding expired data records). */
 		TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),
 
-		/** Start reading from the latest incoming record */
+		/** Start reading from the latest incoming record. */
 		LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM),
 
-		/** Start reading from the record at the specified timestamp */
+		/** Start reading from the record at the specified timestamp. */
 		AT_TIMESTAMP(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM);
 
 		private SentinelSequenceNumber sentinelSequenceNumber;
@@ -53,55 +54,55 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
 		}
 	}
 
-	/** The initial position to start reading Kinesis streams from (LATEST is used if not set) */
+	/** The initial position to start reading Kinesis streams from (LATEST is used if not set). */
 	public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";
 
-	/** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */
+	/** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
 	public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";
 
-	/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */
+	/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
 	public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";
 
-	/** The base backoff time between each describeStream attempt */
+	/** The base backoff time between each describeStream attempt. */
 	public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";
 
-	/** The maximum backoff time between each describeStream attempt */
+	/** The maximum backoff time between each describeStream attempt. */
 	public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";
 
-	/** The power constant for exponential backoff between each describeStream attempt */
+	/** The power constant for exponential backoff between each describeStream attempt. */
 	public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";
 
-	/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard */
+	/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
 	public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
 
-	/** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException */
+	/** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException. */
 	public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";
 
-	/** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
+	/** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
 	public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base";
 
-	/** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
+	/** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
 	public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max";
 
-	/** The power constant for exponential backoff between each getRecords attempt */
+	/** The power constant for exponential backoff between each getRecords attempt. */
 	public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst";
 
-	/** The interval between each getRecords request to a AWS Kinesis shard in milliseconds */
+	/** The interval between each getRecords request to a AWS Kinesis shard in milliseconds. */
 	public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";
 
-	/** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException */
+	/** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException. */
 	public static final String SHARD_GETITERATOR_RETRIES = "flink.shard.getiterator.maxretries";
 
-	/** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
+	/** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException. */
 	public static final String SHARD_GETITERATOR_BACKOFF_BASE = "flink.shard.getiterator.backoff.base";
 
-	/** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
+	/** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException. */
 	public static final String SHARD_GETITERATOR_BACKOFF_MAX = "flink.shard.getiterator.backoff.max";
 
-	/** The power constant for exponential backoff between each getShardIterator attempt */
+	/** The power constant for exponential backoff between each getShardIterator attempt. */
 	public static final String SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getiterator.backoff.expconst";
 
-	/** The interval between each attempt to discover new shards */
+	/** The interval between each attempt to discover new shards. */
 	public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis";
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
index 1edddfc..d131150 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.config;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 
 /**
- * Optional producer specific configuration keys for {@link FlinkKinesisProducer}
+ * Optional producer specific configuration keys for {@link FlinkKinesisProducer}.
  */
 public class ProducerConfigConstants extends AWSConfigConstants {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
index 55668c6..b1ac057 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kinesis.examples;
 
 import org.apache.flink.api.java.utils.ParameterTool;
@@ -26,7 +27,7 @@ import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import java.util.Properties;
 
 /**
- * This is an example on how to consume data from Kinesis
+ * This is an example on how to consume data from Kinesis.
  */
 public class ConsumeFromKinesis {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
index d178137..ee031eb 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kinesis.examples;
 
-import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -25,10 +25,12 @@ import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
+import org.apache.commons.lang3.RandomStringUtils;
+
 import java.util.Properties;
 
 /**
- * This is an example on how to produce data into Kinesis
+ * This is an example on how to produce data into Kinesis.
  */
 public class ProduceIntoKinesis {
 
@@ -57,13 +59,16 @@ public class ProduceIntoKinesis {
 		see.execute();
 	}
 
+	/**
+	 * Data generator that creates strings starting with a sequence number followed by a dash and 12 random characters.
+	 */
 	public static class EventsGenerator implements SourceFunction<String> {
 		private boolean running = true;
 
 		@Override
 		public void run(SourceContext<String> ctx) throws Exception {
 			long seq = 0;
-			while(running) {
+			while (running) {
 				Thread.sleep(10);
 				ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 11ac6d4..bbfbb20 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -17,30 +17,30 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.util.InstantiationUtil;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -77,10 +77,10 @@ public class KinesisDataFetcher<T> {
 	//  Consumer-wide settings
 	// ------------------------------------------------------------------------
 
-	/** Configuration properties for the Flink Kinesis Consumer */
+	/** Configuration properties for the Flink Kinesis Consumer. */
 	private final Properties configProps;
 
-	/** The list of Kinesis streams that the consumer is subscribing to */
+	/** The list of Kinesis streams that the consumer is subscribing to. */
 	private final List<String> streams;
 
 	/**
@@ -94,7 +94,7 @@ public class KinesisDataFetcher<T> {
 	//  Subtask-specific settings
 	// ------------------------------------------------------------------------
 
-	/** Runtime context of the subtask that this fetcher was created in */
+	/** Runtime context of the subtask that this fetcher was created in. */
 	private final RuntimeContext runtimeContext;
 
 	private final int totalNumberOfConsumerSubtasks;
@@ -105,7 +105,7 @@ public class KinesisDataFetcher<T> {
 	//  Executor services to run created threads
 	// ------------------------------------------------------------------------
 
-	/** Executor service to run {@link ShardConsumer}s to consume Kinesis shards */
+	/** Executor service to run {@link ShardConsumer}s to consume Kinesis shards. */
 	private final ExecutorService shardConsumersExecutor;
 
 	// ------------------------------------------------------------------------
@@ -135,22 +135,22 @@ public class KinesisDataFetcher<T> {
 
 	private final SourceFunction.SourceContext<T> sourceContext;
 
-	/** Checkpoint lock, also used to synchronize operations on subscribedShardsState */
+	/** Checkpoint lock, also used to synchronize operations on subscribedShardsState. */
 	private final Object checkpointLock;
 
-	/** Reference to the first error thrown by any of the {@link ShardConsumer} threads */
+	/** Reference to the first error thrown by any of the {@link ShardConsumer} threads. */
 	private final AtomicReference<Throwable> error;
 
-	/** The Kinesis proxy that the fetcher will be using to discover new shards */
+	/** The Kinesis proxy that the fetcher will be using to discover new shards. */
 	private final KinesisProxyInterface kinesis;
 
-	/** Thread that executed runFetcher() */
+	/** Thread that executed runFetcher(). */
 	private volatile Thread mainThread;
 
 	/**
 	 * The current number of shards that are actively read by this fetcher.
 	 *
-	 * This value is updated in {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
+	 * <p>This value is updated in {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)},
 	 * and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}.
 	 */
 	private final AtomicInteger numberOfActiveShards = new AtomicInteger(0);
@@ -183,7 +183,7 @@ public class KinesisDataFetcher<T> {
 			KinesisProxy.create(configProps));
 	}
 
-	/** This constructor is exposed for testing purposes */
+	/** This constructor is exposed for testing purposes. */
 	protected KinesisDataFetcher(List<String> streams,
 								SourceFunction.SourceContext<T> sourceContext,
 								Object checkpointLock,
@@ -381,9 +381,9 @@ public class KinesisDataFetcher<T> {
 		shardConsumersExecutor.shutdownNow();
 	}
 
-	/** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown */
+	/** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown. */
 	public void awaitTermination() throws InterruptedException {
-		while(!shardConsumersExecutor.isTerminated()) {
+		while (!shardConsumersExecutor.isTerminated()) {
 			Thread.sleep(50);
 		}
 	}
@@ -400,7 +400,7 @@ public class KinesisDataFetcher<T> {
 	//  Functions that update the subscribedStreamToLastDiscoveredShardIds state
 	// ------------------------------------------------------------------------
 
-	/** Updates the last discovered shard of a subscribed stream; only updates if the update is valid */
+	/** Updates the last discovered shard of a subscribed stream; only updates if the update is valid. */
 	public void advanceLastDiscoveredShardOfStream(String stream, String shardId) {
 		String lastSeenShardIdOfStream = this.subscribedStreamsToLastDiscoveredShardIds.get(stream);
 
@@ -417,7 +417,7 @@ public class KinesisDataFetcher<T> {
 	/**
 	 * A utility function that does the following:
 	 *
-	 * 1. Find new shards for each stream that we haven't seen before
+	 * <p>1. Find new shards for each stream that we haven't seen before
 	 * 2. For each new shard, determine whether this consumer subtask should subscribe to them;
 	 * 	  if yes, it is added to the returned list of shards
 	 * 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards
@@ -538,7 +538,7 @@ public class KinesisDataFetcher<T> {
 				this.numberOfActiveShards.incrementAndGet();
 			}
 
-			return subscribedShardsState.size()-1;
+			return subscribedShardsState.size() - 1;
 		}
 	}
 
@@ -574,7 +574,7 @@ public class KinesisDataFetcher<T> {
 
 	/**
 	 * Utility function to create an initial map of the last discovered shard id of each subscribed stream, set to null;
-	 * This is called in the constructor; correct values will be set later on by calling advanceLastDiscoveredShardOfStream()
+	 * This is called in the constructor; correct values will be set later on by calling advanceLastDiscoveredShardOfStream().
 	 *
 	 * @param streams the list of subscribed streams
 	 * @return the initial map for subscribedStreamsToLastDiscoveredShardIds
@@ -588,7 +588,7 @@ public class KinesisDataFetcher<T> {
 	}
 
 	/**
-	 * Utility function to convert {@link StreamShardHandle} into {@link StreamShardMetadata}
+	 * Utility function to convert {@link StreamShardHandle} into {@link StreamShardMetadata}.
 	 *
 	 * @param streamShardHandle the {@link StreamShardHandle} to be converted
 	 * @return a {@link StreamShardMetadata} object
@@ -615,7 +615,7 @@ public class KinesisDataFetcher<T> {
 	}
 
 	/**
-	 * Utility function to convert {@link StreamShardMetadata} into {@link StreamShardHandle}
+	 * Utility function to convert {@link StreamShardMetadata} into {@link StreamShardHandle}.
 	 *
 	 * @param streamShardMetadata the {@link StreamShardMetadata} to be converted
 	 * @return a {@link StreamShardHandle} object

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index a724b49..2d48e5f 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -17,19 +17,20 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
-import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,7 +89,7 @@ public class ShardConsumer<T> implements Runnable {
 			KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
 	}
 
-	/** This constructor is exposed for testing purposes */
+	/** This constructor is exposed for testing purposes. */
 	protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
 							Integer subscribedShardStateIndex,
 							StreamShardHandle subscribedShard,
@@ -186,7 +187,7 @@ public class ShardConsumer<T> implements Runnable {
 				}
 			}
 
-			while(isRunning()) {
+			while (isRunning()) {
 				if (nextShardItr == null) {
 					fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
 
@@ -233,7 +234,7 @@ public class ShardConsumer<T> implements Runnable {
 	 * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard
 	 * iterators if necessary.
 	 *
-	 * Note that the server-side Kinesis timestamp is attached to the record when collected. When the
+	 * <p>Note that the server-side Kinesis timestamp is attached to the record when collected. When the
 	 * user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default.
 	 *
 	 * @param record record to deserialize and collect
@@ -275,7 +276,7 @@ public class ShardConsumer<T> implements Runnable {
 	 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
 	 * be used for the next call to this method.
 	 *
-	 * Note: it is important that this method is not called again before all the records from the last result have been
+	 * <p>Note: it is important that this method is not called again before all the records from the last result have been
 	 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
 	 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
 	 * incorrect shard iteration if the iterator had to be refreshed.

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
index 592e30d..22bfbf5 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
@@ -37,7 +37,7 @@ public class KinesisStreamShard implements Serializable {
 	private final int cachedHash;
 
 	/**
-	 * Create a new KinesisStreamShard
+	 * Create a new KinesisStreamShard.
 	 *
 	 * @param streamName
 	 *           the name of the Kinesis stream that this shard belongs to
@@ -96,7 +96,7 @@ public class KinesisStreamShard implements Serializable {
 	}
 
 	/**
-	 * Utility function to compare two shard ids
+	 * Utility function to compare two shard ids.
 	 *
 	 * @param firstShardId first shard id to compare
 	 * @param secondShardId second shard id to compare
@@ -126,7 +126,9 @@ public class KinesisStreamShard implements Serializable {
 	 * @return whether the shard id is valid
 	 */
 	public static boolean isValidShardId(String shardId) {
-		if (shardId == null) { return false; }
+		if (shardId == null) {
+			return false;
+		}
 		return shardId.matches("^shardId-\\d{12}");
 	}
 
@@ -148,7 +150,7 @@ public class KinesisStreamShard implements Serializable {
 			streamShardMetadata.setStartingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getStartingHashKey());
 			streamShardMetadata.setEndingHashKey(kinesisStreamShard.getShard().getHashKeyRange().getEndingHashKey());
 		}
-		
+
 		if (kinesisStreamShard.getShard().getSequenceNumberRange() != null) {
 			streamShardMetadata.setStartingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getStartingSequenceNumber());
 			streamShardMetadata.setEndingSequenceNumber(kinesisStreamShard.getShard().getSequenceNumberRange().getEndingSequenceNumber());

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
index 4b1cc1c..fbd2e47 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShardState.java
@@ -17,9 +17,10 @@
 
 package org.apache.flink.streaming.connectors.kinesis.model;
 
-import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.util.Preconditions;
 
+import com.amazonaws.services.kinesis.model.Shard;
+
 /**
  * A wrapper class that bundles a {@link StreamShardHandle} with its last processed sequence number.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
index 7f9dbbb..a5398e4 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SentinelSequenceNumber.java
@@ -28,20 +28,20 @@ import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetche
 public enum SentinelSequenceNumber {
 
 	/** Flag value for shard's sequence numbers to indicate that the
-	 * shard should start to be read from the latest incoming records */
-	SENTINEL_LATEST_SEQUENCE_NUM( new SequenceNumber("LATEST_SEQUENCE_NUM") ),
+	 * shard should start to be read from the latest incoming records. */
+	SENTINEL_LATEST_SEQUENCE_NUM(new SequenceNumber("LATEST_SEQUENCE_NUM")),
 
 	/** Flag value for shard's sequence numbers to indicate that the shard should
-	 * start to be read from the earliest records that haven't expired yet */
-	SENTINEL_EARLIEST_SEQUENCE_NUM( new SequenceNumber("EARLIEST_SEQUENCE_NUM") ),
+	 * start to be read from the earliest records that haven't expired yet. */
+	SENTINEL_EARLIEST_SEQUENCE_NUM(new SequenceNumber("EARLIEST_SEQUENCE_NUM")),
 
 	/** Flag value for shard's sequence numbers to indicate that the shard should
-	 * start to be read from the specified timestamp */
-	SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM( new SequenceNumber("AT_TIMESTAMP_SEQUENCE_NUM") ),
+	 * start to be read from the specified timestamp. */
+	SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM(new SequenceNumber("AT_TIMESTAMP_SEQUENCE_NUM")),
 
 	/** Flag value to indicate that we have already read the last record of this shard
-	 * (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record) */
-	SENTINEL_SHARD_ENDING_SEQUENCE_NUM( new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM") );
+	 * (Note: Kinesis shards that have been closed due to a split or merge will have an ending data record). */
+	SENTINEL_SHARD_ENDING_SEQUENCE_NUM(new SequenceNumber("SHARD_ENDING_SEQUENCE_NUM"));
 
 	private SequenceNumber sentinel;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
index d340a88..767c227 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java
@@ -34,7 +34,7 @@ public class StreamShardHandle {
 	private final int cachedHash;
 
 	/**
-	 * Create a new StreamShardHandle
+	 * Create a new StreamShardHandle.
 	 *
 	 * @param streamName
 	 *           the name of the Kinesis stream that this shard belongs to
@@ -93,7 +93,7 @@ public class StreamShardHandle {
 	}
 
 	/**
-	 * Utility function to compare two shard ids
+	 * Utility function to compare two shard ids.
 	 *
 	 * @param firstShardId first shard id to compare
 	 * @param secondShardId second shard id to compare
@@ -123,7 +123,9 @@ public class StreamShardHandle {
 	 * @return whether the shard id is valid
 	 */
 	public static boolean isValidShardId(String shardId) {
-		if (shardId == null) { return false; }
+		if (shardId == null) {
+			return false;
+		}
 		return shardId.matches("^shardId-\\d{12}");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
index aadb31c..fcfb3ac 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/GetShardListResult.java
@@ -19,9 +19,9 @@ package org.apache.flink.streaming.connectors.kinesis.proxy;
 
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 70c1286..89e9f04 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -17,34 +17,36 @@
 
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.LimitExceededException;
 import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
-import com.amazonaws.services.kinesis.model.StreamStatus;
 import com.amazonaws.services.kinesis.model.Shard;
-import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import com.amazonaws.services.kinesis.model.StreamStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Properties;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
-import java.util.Date;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -53,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * calls to AWS Kinesis for several functions, such as getting a list of shards and
  * fetching a batch of data records starting from a specified record sequence number.
  *
- * NOTE:
+ * <p>NOTE:
  * In the AWS KCL library, there is a similar implementation - {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}.
  * This implementation differs mainly in that we can make operations to arbitrary Kinesis streams, which is a needed
  * functionality for the Flink Kinesis Connecter since the consumer may simultaneously read from multiple Kinesis streams.
@@ -62,59 +64,59 @@ public class KinesisProxy implements KinesisProxyInterface {
 
 	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);
 
-	/** The actual Kinesis client from the AWS SDK that we will be using to make calls */
+	/** The actual Kinesis client from the AWS SDK that we will be using to make calls. */
 	private final AmazonKinesisClient kinesisClient;
 
-	/** Random seed used to calculate backoff jitter for Kinesis operations */
-	private final static Random seed = new Random();
+	/** Random seed used to calculate backoff jitter for Kinesis operations. */
+	private static final Random seed = new Random();
 
 	// ------------------------------------------------------------------------
 	//  describeStream() related performance settings
 	// ------------------------------------------------------------------------
 
-	/** Base backoff millis for the describe stream operation */
+	/** Base backoff millis for the describe stream operation. */
 	private final long describeStreamBaseBackoffMillis;
 
-	/** Maximum backoff millis for the describe stream operation */
+	/** Maximum backoff millis for the describe stream operation. */
 	private final long describeStreamMaxBackoffMillis;
 
-	/** Exponential backoff power constant for the describe stream operation */
+	/** Exponential backoff power constant for the describe stream operation. */
 	private final double describeStreamExpConstant;
 
 	// ------------------------------------------------------------------------
 	//  getRecords() related performance settings
 	// ------------------------------------------------------------------------
 
-	/** Base backoff millis for the get records operation */
+	/** Base backoff millis for the get records operation. */
 	private final long getRecordsBaseBackoffMillis;
 
-	/** Maximum backoff millis for the get records operation */
+	/** Maximum backoff millis for the get records operation. */
 	private final long getRecordsMaxBackoffMillis;
 
-	/** Exponential backoff power constant for the get records operation */
+	/** Exponential backoff power constant for the get records operation. */
 	private final double getRecordsExpConstant;
 
-	/** Maximum attempts for the get records operation */
+	/** Maximum attempts for the get records operation. */
 	private final int getRecordsMaxAttempts;
 
 	// ------------------------------------------------------------------------
 	//  getShardIterator() related performance settings
 	// ------------------------------------------------------------------------
 
-	/** Base backoff millis for the get shard iterator operation */
+	/** Base backoff millis for the get shard iterator operation. */
 	private final long getShardIteratorBaseBackoffMillis;
 
-	/** Maximum backoff millis for the get shard iterator operation */
+	/** Maximum backoff millis for the get shard iterator operation. */
 	private final long getShardIteratorMaxBackoffMillis;
 
-	/** Exponential backoff power constant for the get shard iterator operation */
+	/** Exponential backoff power constant for the get shard iterator operation. */
 	private final double getShardIteratorExpConstant;
 
-	/** Maximum attempts for the get shard iterator operation */
+	/** Maximum attempts for the get shard iterator operation. */
 	private final int getShardIteratorMaxAttempts;
 
 	/**
-	 * Create a new KinesisProxy based on the supplied configuration properties
+	 * Create a new KinesisProxy based on the supplied configuration properties.
 	 *
 	 * @param configProps configuration properties containing AWS credential and AWS region info
 	 */
@@ -225,7 +227,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 	public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
 		GetShardListResult result = new GetShardListResult();
 
-		for (Map.Entry<String,String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) {
+		for (Map.Entry<String, String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) {
 			String stream = streamNameWithLastSeenShardId.getKey();
 			String lastSeenShardId = streamNameWithLastSeenShardId.getValue();
 			result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId));
@@ -294,7 +296,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 
 	/**
 	 * Determines whether the exception is recoverable using exponential-backoff.
-	 * 
+	 *
 	 * @param ex Exception to inspect
 	 * @return <code>true</code> if the exception can be recovered from, else
 	 *         <code>false</code>
@@ -338,7 +340,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 	/**
 	 * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess.
 	 *
-	 * This method is using a "full jitter" approach described in AWS's article,
+	 * <p>This method is using a "full jitter" approach described in AWS's article,
 	 * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>.
 	 * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This
 	 * jitter backoff approach will help distribute calls across the fetchers over time.
@@ -395,6 +397,6 @@ public class KinesisProxy implements KinesisProxyInterface {
 
 	private static long fullJitterBackoff(long base, long max, double power, int attempt) {
 		long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt));
-		return (long)(seed.nextDouble()*exponentialBackoff); // random jitter between 0 and the exponential backoff
+		return (long) (seed.nextDouble() * exponentialBackoff); // random jitter between 0 and the exponential backoff
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
index 807a163..0538151 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
@@ -17,9 +17,10 @@
 
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+
 import java.util.Map;
 
 /**
@@ -46,7 +47,7 @@ public interface KinesisProxyInterface {
 	String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) throws InterruptedException;
 
 	/**
-	 * Get the next batch of data records using a specific shard iterator
+	 * Get the next batch of data records using a specific shard iterator.
 	 *
 	 * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading
 	 * @param maxRecordsToGet the maximum amount of records to retrieve for this batch
@@ -67,5 +68,5 @@ public interface KinesisProxyInterface {
 	 *                              operation has exceeded the rate limit; this exception will be thrown
 	 *                              if the backoff is interrupted.
 	 */
-	GetShardListResult getShardList(Map<String,String> streamNamesWithLastSeenShardIds) throws InterruptedException;
+	GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
index 0effdd8..b06b20f 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java
@@ -33,7 +33,7 @@ import java.io.Serializable;
 public interface KinesisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
 
 	/**
-	 * Deserializes a Kinesis record's bytes
+	 * Deserializes a Kinesis record's bytes.
 	 *
 	 * @param recordValue the record's value as a byte array
 	 * @param partitionKey the record's partition key at the time of writing

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
index 6e66038..279d410 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import java.io.IOException;
 
 /**
- * A simple wrapper for using the {@link DeserializationSchema} with the {@link KinesisDeserializationSchema} interface
+ * A simple wrapper for using the {@link DeserializationSchema} with the {@link KinesisDeserializationSchema} interface.
  *
  * @param <T> The type created by the deserialization schema.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
index 03dd72c..9be410a 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisSerializationSchema.java
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kinesis.serialization;
 
+package org.apache.flink.streaming.connectors.kinesis.serialization;
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
@@ -27,7 +27,7 @@ import java.nio.ByteBuffer;
  */
 public interface KinesisSerializationSchema<T> extends Serializable {
 	/**
-	 * Serialize the given element into a ByteBuffer
+	 * Serialize the given element into a ByteBuffer.
 	 *
 	 * @param element The element to serialize
 	 * @return Serialized representation of the element

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index a6aad02..5670526 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -17,6 +17,10 @@
 
 package org.apache.flink.streaming.connectors.kinesis.util;
 
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.auth.AWSCredentials;
@@ -29,9 +33,6 @@ import com.amazonaws.auth.profile.ProfileCredentialsProvider;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
 
 import java.util.Properties;
 
@@ -126,7 +127,7 @@ public class AWSUtil {
 	}
 
 	/**
-	 * Checks whether or not a region ID is valid
+	 * Checks whether or not a region ID is valid.
 	 *
 	 * @param region The AWS region ID to check
 	 * @return true if the supplied region ID is valid, false otherwise

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 244f5a5..42f1af0 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kinesis.util;
 
-import com.amazonaws.regions.Regions;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
@@ -26,6 +25,8 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
 import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 
+import com.amazonaws.regions.Regions;
+
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Properties;
@@ -141,7 +142,7 @@ public class KinesisConfigUtil {
 	}
 
 	/**
-	 * Validate configuration properties related to Amazon AWS service
+	 * Validate configuration properties related to Amazon AWS service.
 	 */
 	public static void validateAwsConfiguration(Properties config) {
 		if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index e24a411..af84420 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -17,19 +17,20 @@
 
 package org.apache.flink.streaming.connectors.kinesis;
 
-import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+
+import com.amazonaws.services.kinesis.model.Shard;
 import org.junit.Test;
 
 import java.net.URL;
@@ -37,8 +38,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.mockito.Mockito.mock;
 
 /**
@@ -136,11 +137,12 @@ public class FlinkKinesisConsumerMigrationTest {
 		}
 
 		@Override
-		protected KinesisDataFetcher<T> createFetcher(List<String> streams,
-													  	SourceFunction.SourceContext<T> sourceContext,
-													  	RuntimeContext runtimeContext,
-													  	Properties configProps,
-													  	KinesisDeserializationSchema<T> deserializationSchema) {
+		protected KinesisDataFetcher<T> createFetcher(
+				List<String> streams,
+				SourceFunction.SourceContext<T> sourceContext,
+				RuntimeContext runtimeContext,
+				Properties configProps,
+				KinesisDeserializationSchema<T> deserializationSchema) {
 			return mock(KinesisDataFetcher.class);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 186dfa6..a26e758 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -17,9 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kinesis;
 
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
@@ -38,15 +35,19 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -59,21 +60,21 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
-import static org.junit.Assert.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.never;
 
 /**
  * Suite of FlinkKinesisConsumer tests for the methods called throughout the source life cycle.

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 4fb6dd4..2e1adb6 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -17,21 +17,22 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mockito;
@@ -40,9 +41,9 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.HashMap;
-import java.util.Map;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
@@ -54,6 +55,9 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link KinesisDataFetcher}.
+ */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(TestableKinesisDataFetcher.class)
 public class KinesisDataFetcherTest {
@@ -92,10 +96,10 @@ public class KinesisDataFetcherTest {
 		HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
 			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
 
-		Map<String,Integer> streamToShardCount = new HashMap<>();
+		Map<String, Integer> streamToShardCount = new HashMap<>();
 		Random rand = new Random();
 		for (String fakeStream : fakeStreams) {
-			streamToShardCount.put(fakeStream, rand.nextInt(5)+1);
+			streamToShardCount.put(fakeStream, rand.nextInt(5) + 1);
 		}
 
 		final TestableKinesisDataFetcher fetcher =
@@ -140,10 +144,10 @@ public class KinesisDataFetcherTest {
 		assertTrue(streamsInState.containsAll(fakeStreams));
 
 		// assert that the last seen shards in state is correctly set
-		for (Map.Entry<String,String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
+		for (Map.Entry<String, String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
 			assertTrue(
 				streamToLastSeenShard.getValue().equals(
-					KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1)));
+					KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey()) - 1)));
 		}
 	}
 
@@ -184,7 +188,7 @@ public class KinesisDataFetcherTest {
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
 
-		Map<String,Integer> streamToShardCount = new HashMap<>();
+		Map<String, Integer> streamToShardCount = new HashMap<>();
 		streamToShardCount.put("fakeStream1", 3); // fakeStream1 will still have 3 shards after restore
 		streamToShardCount.put("fakeStream2", 2); // fakeStream2 will still have 2 shards after restore
 
@@ -230,10 +234,10 @@ public class KinesisDataFetcherTest {
 		assertTrue(streamsInState.containsAll(fakeStreams));
 
 		// assert that the last seen shards in state is correctly set
-		for (Map.Entry<String,String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
+		for (Map.Entry<String, String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
 			assertTrue(
 				streamToLastSeenShard.getValue().equals(
-					KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1)));
+					KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey()) - 1)));
 		}
 	}
 
@@ -274,9 +278,9 @@ public class KinesisDataFetcherTest {
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
 
-		Map<String,Integer> streamToShardCount = new HashMap<>();
-		streamToShardCount.put("fakeStream1", 3+1); // fakeStream1 had 3 shards before & 1 new shard after restore
-		streamToShardCount.put("fakeStream2", 2+3); // fakeStream2 had 2 shards before & 3 new shard after restore
+		Map<String, Integer> streamToShardCount = new HashMap<>();
+		streamToShardCount.put("fakeStream1", 3 + 1); // fakeStream1 had 3 shards before & 1 new shard after restore
+		streamToShardCount.put("fakeStream2", 2 + 3); // fakeStream2 had 2 shards before & 3 new shard after restore
 
 		HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
 			KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
@@ -321,10 +325,10 @@ public class KinesisDataFetcherTest {
 		assertTrue(streamsInState.containsAll(fakeStreams));
 
 		// assert that the last seen shards in state is correctly set
-		for (Map.Entry<String,String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
+		for (Map.Entry<String, String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
 			assertTrue(
 				streamToLastSeenShard.getValue().equals(
-					KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey())-1)));
+					KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey()) - 1)));
 		}
 	}
 
@@ -367,7 +371,7 @@ public class KinesisDataFetcherTest {
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
 
-		Map<String,Integer> streamToShardCount = new HashMap<>();
+		Map<String, Integer> streamToShardCount = new HashMap<>();
 		streamToShardCount.put("fakeStream1", 3); // fakeStream1 has fixed 3 shards
 		streamToShardCount.put("fakeStream2", 2); // fakeStream2 has fixed 2 shards
 		streamToShardCount.put("fakeStream3", 0); // no shards can be found for fakeStream3
@@ -463,9 +467,9 @@ public class KinesisDataFetcherTest {
 				new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
 			UUID.randomUUID().toString());
 
-		Map<String,Integer> streamToShardCount = new HashMap<>();
-		streamToShardCount.put("fakeStream1", 3+1); // fakeStream1 had 3 shards before & 1 new shard after restore
-		streamToShardCount.put("fakeStream2", 2+3); // fakeStream2 had 2 shards before & 2 new shard after restore
+		Map<String, Integer> streamToShardCount = new HashMap<>();
+		streamToShardCount.put("fakeStream1", 3 + 1); // fakeStream1 had 3 shards before & 1 new shard after restore
+		streamToShardCount.put("fakeStream2", 2 + 3); // fakeStream2 had 2 shards before & 2 new shard after restore
 		streamToShardCount.put("fakeStream3", 0); // no shards can be found for fakeStream3
 		streamToShardCount.put("fakeStream4", 0); // no shards can be found for fakeStream4
 
@@ -569,11 +573,12 @@ public class KinesisDataFetcherTest {
 		}
 
 		@Override
-		protected KinesisDataFetcher<T> createFetcher(List<String> streams,
-													  SourceFunction.SourceContext<T> sourceContext,
-													  RuntimeContext runtimeContext,
-													  Properties configProps,
-													  KinesisDeserializationSchema<T> deserializationSchema) {
+		protected KinesisDataFetcher<T> createFetcher(
+				List<String> streams,
+				SourceFunction.SourceContext<T> sourceContext,
+				RuntimeContext runtimeContext,
+				Properties configProps,
+				KinesisDeserializationSchema<T> deserializationSchema) {
 			return fetcher;
 		}