You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:43 UTC

[13/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-connector-kinesis

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index b22ba0c..a194835 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -17,17 +17,18 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.commons.lang.StringUtils;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.StringUtils;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -39,6 +40,9 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the {@link ShardConsumer}.
+ */
 public class ShardConsumerTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
index 63c6c2b..2915e2f 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualConsumerProducerTest.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kinesis.manualtests;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -37,12 +38,12 @@ import java.util.Properties;
 /**
  * This is a manual test for the AWS Kinesis connector in Flink.
  *
- * It uses:
+ * <p>It uses:
  *  - A custom KinesisSerializationSchema
  *  - A custom KinesisPartitioner
  *
- * Invocation:
- * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ * <p>Invocation:
+ * --region eu-central-1 --accessKey X --secretKey X
  */
 public class ManualConsumerProducerTest {
 
@@ -69,7 +70,7 @@ public class ManualConsumerProducerTest {
 					// every 10th element goes into a different stream
 					@Override
 					public String getTargetStream(String element) {
-						if(element.split("-")[0].endsWith("0")) {
+						if (element.split("-")[0].endsWith("0")) {
 							return "flink-test-2";
 						}
 						return null; // send to default stream
@@ -90,7 +91,6 @@ public class ManualConsumerProducerTest {
 		});
 		simpleStringStream.addSink(kinesis);
 
-
 		// consuming topology
 		Properties consumerProps = new Properties();
 		consumerProps.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey"));
@@ -104,13 +104,13 @@ public class ManualConsumerProducerTest {
 				String[] parts = value.split("-");
 				try {
 					long l = Long.parseLong(parts[0]);
-					if(l < 0) {
+					if (l < 0) {
 						throw new RuntimeException("Negative");
 					}
-				} catch(NumberFormatException nfe) {
+				} catch (NumberFormatException nfe) {
 					throw new RuntimeException("First part of '" + value + "' is not a valid numeric type");
 				}
-				if(parts[1].length() != 12) {
+				if (parts[1].length() != 12) {
 					throw new RuntimeException("Second part of '" + value + "' doesn't have 12 characters");
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 2e452c1..7abcd3c 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -14,10 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kinesis.manualtests;
 
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -26,6 +25,9 @@ import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,8 +40,8 @@ import java.util.concurrent.atomic.AtomicReference;
  * Then, it starts a consuming topology, ensuring that all records up to a certain
  * point have been seen.
  *
- * Invocation:
- * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ * <p>Invocation:
+ * --region eu-central-1 --accessKey X --secretKey X
  */
 public class ManualExactlyOnceTest {
 
@@ -67,8 +69,8 @@ public class ManualExactlyOnceTest {
 
 		// wait until stream has been created
 		DescribeStreamResult status = client.describeStream(streamName);
-		LOG.info("status {}" ,status);
-		while(!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
+		LOG.info("status {}" , status);
+		while (!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
 			status = client.describeStream(streamName);
 			LOG.info("Status of stream {}", status);
 			Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 71bcae3..226ac3e 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -17,13 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kinesis.manualtests;
 
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
-import com.amazonaws.services.kinesis.model.LimitExceededException;
-import com.amazonaws.services.kinesis.model.PutRecordsRequest;
-import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
-import com.amazonaws.services.kinesis.model.PutRecordsResult;
-import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -32,6 +25,14 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,8 +50,8 @@ import java.util.concurrent.atomic.AtomicReference;
  * point have been seen. While the data generator and consuming topology is running,
  * the kinesis stream is resharded two times.
  *
- * Invocation:
- * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ * <p>Invocation:
+ * --region eu-central-1 --accessKey X --secretKey X
  */
 public class ManualExactlyOnceWithStreamReshardingTest {
 
@@ -80,7 +81,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
 		// wait until stream has been created
 		DescribeStreamResult status = client.describeStream(streamName);
 		LOG.info("status {}", status);
-		while(!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
+		while (!status.getStreamDescription().getStreamStatus().equals("ACTIVE")) {
 			status = client.describeStream(streamName);
 			LOG.info("Status of stream {}", status);
 			Thread.sleep(1000);
@@ -113,7 +114,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
 							Thread.sleep(10);
 
 							Set<PutRecordsRequestEntry> batch = new HashSet<>();
-							for (int i=count; i<count+batchSize; i++) {
+							for (int i = count; i < count + batchSize; i++) {
 								if (i >= TOTAL_EVENT_COUNT) {
 									break;
 								}

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
index 81d0bec..8abf4bb 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualProducerTest.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kinesis.manualtests;
 
 import org.apache.flink.api.java.utils.ParameterTool;
@@ -32,14 +33,14 @@ import java.util.Properties;
 /**
  * This is a manual test for the AWS Kinesis connector in Flink.
  *
- * It uses:
+ * <p>It uses:
  *  - A custom KinesisSerializationSchema
  *  - A custom KinesisPartitioner
  *
- *  The streams "test-flink" and "flink-test-2" must exist.
+ * <p>The streams "test-flink" and "flink-test-2" must exist.
  *
- * Invocation:
- * --region eu-central-1 --accessKey XXXXXXXXXXXX --secretKey XXXXXXXXXXXXXXXX
+ * <p>Invocation:
+ * --region eu-central-1 --accessKey X --secretKey X
  */
 public class ManualProducerTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index 86202c5..7ca05d7 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -17,15 +17,14 @@
 
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
-import org.junit.Test;
-
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.AmazonServiceException.ErrorType;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test for methods in the {@link KinesisProxy} class.

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
index 157964c..75356ef 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConsta
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +41,7 @@ import static org.apache.flink.test.util.TestUtils.tryExecute;
 
 /**
  * A thread that runs a topology with the FlinkKinesisConsumer as source, followed by two flat map
- * functions, one that performs artificial failures and another that validates exactly-once guarantee
+ * functions, one that performs artificial failures and another that validates exactly-once guarantee.
  */
 public class ExactlyOnceValidatingConsumerThread {
 
@@ -94,7 +95,7 @@ public class ExactlyOnceValidatingConsumerThread {
 		return new Thread(exactlyOnceValidationConsumer);
 	}
 
-	private static class ExactlyOnceValidatingMapper implements FlatMapFunction<String,String>, Checkpointed<BitSet> {
+	private static class ExactlyOnceValidatingMapper implements FlatMapFunction<String, String>, Checkpointed<BitSet> {
 
 		private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceValidatingMapper.class);
 
@@ -111,15 +112,15 @@ public class ExactlyOnceValidatingConsumerThread {
 			LOG.info("Consumed {}", value);
 
 			int id = Integer.parseInt(value.split("-")[0]);
-			if(validator.get(id)) {
-				throw new RuntimeException("Saw id " + id +" twice!");
+			if (validator.get(id)) {
+				throw new RuntimeException("Saw id " + id + " twice!");
 			}
 			validator.set(id);
-			if(id > totalEventCount-1) {
+			if (id > totalEventCount - 1) {
 				throw new RuntimeException("Out of bounds ID observed");
 			}
 
-			if(validator.nextClearBit(0) == totalEventCount) {
+			if (validator.nextClearBit(0) == totalEventCount) {
 				throw new SuccessException();
 			}
 		}
@@ -135,7 +136,7 @@ public class ExactlyOnceValidatingConsumerThread {
 		}
 	}
 
-	private static class ArtificialFailOnceFlatMapper extends RichFlatMapFunction<String,String> {
+	private static class ArtificialFailOnceFlatMapper extends RichFlatMapFunction<String, String> {
 		int count = 0;
 
 		private final int failAtRecordCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index ce5a0de..2fda0d5 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -17,15 +17,16 @@
 
 package org.apache.flink.streaming.connectors.kinesis.testutils;
 
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.model.Shard;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Date;
@@ -67,7 +68,7 @@ public class FakeKinesisBehavioursFactory {
 
 	}
 
-	public static KinesisProxyInterface nonReshardedStreamsBehaviour(Map<String,Integer> streamsToShardCount) {
+	public static KinesisProxyInterface nonReshardedStreamsBehaviour(Map<String, Integer> streamsToShardCount) {
 		return new NonReshardedStreamsKinesis(streamsToShardCount);
 
 	}
@@ -79,14 +80,14 @@ public class FakeKinesisBehavioursFactory {
 	public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCalls(final int numOfRecords, final int numOfGetRecordsCalls) {
 		return new SingleShardEmittingFixNumOfRecordsKinesis(numOfRecords, numOfGetRecordsCalls);
 	}
-	
+
 	public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
 		final int numOfRecords, final int numOfGetRecordsCall, final int orderOfCallToExpire) {
 		return new SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(
 			numOfRecords, numOfGetRecordsCall, orderOfCallToExpire);
 	}
 
-	public static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis {
+	private static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis {
 
 		private boolean expiredOnceAlready = false;
 		private boolean expiredIteratorRefreshed = false;
@@ -103,7 +104,7 @@ public class FakeKinesisBehavioursFactory {
 
 		@Override
 		public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
-			if ((Integer.valueOf(shardIterator) == orderOfCallToExpire-1) && !expiredOnceAlready) {
+			if ((Integer.valueOf(shardIterator) == orderOfCallToExpire - 1) && !expiredOnceAlready) {
 				// we fake only once the expired iterator exception at the specified get records attempt order
 				expiredOnceAlready = true;
 				throw new ExpiredIteratorException("Artificial expired shard iterator");
@@ -130,7 +131,7 @@ public class FakeKinesisBehavioursFactory {
 				// fake the iterator refresh when this is called again after getRecords throws expired iterator
 				// exception on the orderOfCallToExpire attempt
 				expiredIteratorRefreshed = true;
-				return String.valueOf(orderOfCallToExpire-1);
+				return String.valueOf(orderOfCallToExpire - 1);
 			}
 		}
 	}
@@ -141,7 +142,7 @@ public class FakeKinesisBehavioursFactory {
 
 		protected final int totalNumOfRecords;
 
-		protected final Map<String,List<Record>> shardItrToRecordBatch;
+		protected final Map<String, List<Record>> shardItrToRecordBatch;
 
 		public SingleShardEmittingFixNumOfRecordsKinesis(final int numOfRecords, final int numOfGetRecordsCalls) {
 			this.totalNumOfRecords = numOfRecords;
@@ -151,9 +152,9 @@ public class FakeKinesisBehavioursFactory {
 			this.shardItrToRecordBatch = new HashMap<>();
 
 			int numOfAlreadyPartitionedRecords = 0;
-			int numOfRecordsPerBatch = numOfRecords/numOfGetRecordsCalls + 1;
-			for (int batch=0; batch<totalNumOfGetRecordsCalls; batch++) {
-				if (batch != totalNumOfGetRecordsCalls-1) {
+			int numOfRecordsPerBatch = numOfRecords / numOfGetRecordsCalls + 1;
+			for (int batch = 0; batch < totalNumOfGetRecordsCalls; batch++) {
+				if (batch != totalNumOfGetRecordsCalls - 1) {
 					shardItrToRecordBatch.put(
 						String.valueOf(batch),
 						createRecordBatchWithRange(
@@ -176,8 +177,8 @@ public class FakeKinesisBehavioursFactory {
 			return new GetRecordsResult()
 				.withRecords(shardItrToRecordBatch.get(shardIterator))
 				.withNextShardIterator(
-					(Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls-1)
-						? null : String.valueOf(Integer.valueOf(shardIterator)+1)); // last next shard iterator is null
+					(Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
+						? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
 		}
 
 		@Override
@@ -211,8 +212,8 @@ public class FakeKinesisBehavioursFactory {
 
 		private Map<String, List<StreamShardHandle>> streamsWithListOfShards = new HashMap<>();
 
-		public NonReshardedStreamsKinesis(Map<String,Integer> streamsToShardCount) {
-			for (Map.Entry<String,Integer> streamToShardCount : streamsToShardCount.entrySet()) {
+		public NonReshardedStreamsKinesis(Map<String, Integer> streamsToShardCount) {
+			for (Map.Entry<String, Integer> streamToShardCount : streamsToShardCount.entrySet()) {
 				String streamName = streamToShardCount.getKey();
 				int shardCount = streamToShardCount.getValue();
 
@@ -220,7 +221,7 @@ public class FakeKinesisBehavioursFactory {
 					// don't do anything
 				} else {
 					List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount);
-					for (int i=0; i < shardCount; i++) {
+					for (int i = 0; i < shardCount; i++) {
 						shardsOfStream.add(
 							new StreamShardHandle(
 								streamName,

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
index fdfdfe1..699c977 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kinesis.testutils;
 
-import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -25,6 +24,8 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.commons.lang3.RandomStringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,12 +98,12 @@ public class KinesisEventsGeneratorProducerThread {
 		@Override
 		public void run(SourceContext<String> ctx) throws Exception {
 			long seq = 0;
-			while(running) {
+			while (running) {
 				Thread.sleep(10);
 				String evt = (seq++) + "-" + RandomStringUtils.randomAlphabetic(12);
 				ctx.collect(evt);
 				LOG.info("Emitting event {}", evt);
-				if(seq >= limit) {
+				if (seq >= limit) {
 					break;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
index c8dd347..0377a66 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisShardIdGenerator.java
@@ -17,8 +17,12 @@
 
 package org.apache.flink.streaming.connectors.kinesis.testutils;
 
+/**
+ * A generator for Kinesis shard IDs.
+ *
+ * <p>Kinesis shard ids are in the form of: shardId-\d{12}
+ */
 public class KinesisShardIdGenerator {
-	// Kinesis shards ids are in the form of: ^shardId-\d{12}
 	public static String generateFromShardOrder(int order) {
 		return String.format("shardId-%012d", order);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
index 80ad06c..6c91eaf 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableFlinkKinesisConsumer.java
@@ -20,12 +20,16 @@ package org.apache.flink.streaming.connectors.kinesis.testutils;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.Properties;
 
+/**
+ * Extension of the {@link FlinkKinesisConsumer} for testing.
+ */
 public class TestableFlinkKinesisConsumer extends FlinkKinesisConsumer<String> {
 
 	private final RuntimeContext mockedRuntimeCtx;

http://git-wip-us.apache.org/repos/asf/flink/blob/b12de1ed/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index bb644ba..b6f3cbc 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -37,6 +38,9 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Extension of the {@link KinesisDataFetcher} for testing.
+ */
 public class TestableKinesisDataFetcher extends KinesisDataFetcher<String> {
 
 	private static final Object fakeCheckpointLock = new Object();
@@ -45,14 +49,15 @@ public class TestableKinesisDataFetcher extends KinesisDataFetcher<String> {
 
 	private OneShotLatch runWaiter;
 
-	public TestableKinesisDataFetcher(List<String> fakeStreams,
-									  Properties fakeConfiguration,
-									  int fakeTotalCountOfSubtasks,
-									  int fakeTndexOfThisSubtask,
-									  AtomicReference<Throwable> thrownErrorUnderTest,
-									  LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest,
-									  HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
-									  KinesisProxyInterface fakeKinesis) {
+	public TestableKinesisDataFetcher(
+			List<String> fakeStreams,
+			Properties fakeConfiguration,
+			int fakeTotalCountOfSubtasks,
+			int fakeTndexOfThisSubtask,
+			AtomicReference<Throwable> thrownErrorUnderTest,
+			LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest,
+			HashMap<String, String> subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
+			KinesisProxyInterface fakeKinesis) {
 		super(fakeStreams,
 			getMockedSourceContext(),
 			fakeCheckpointLock,