You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:38 UTC

[08/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-connector-kafka*

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index ac278fb..03a23f5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.server.KafkaServer;
-import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -82,6 +74,15 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.server.KafkaServer;
+import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.junit.Assert;
@@ -90,6 +91,7 @@ import org.junit.Rule;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
@@ -114,14 +116,15 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
+/**
+ * Abstract test base for all Kafka consumer tests.
+ */
 @SuppressWarnings("serial")
 public abstract class KafkaConsumerTestBase extends KafkaTestBase {
-	
+
 	@Rule
 	public RetryRule retryRule = new RetryRule();
 
-
 	// ------------------------------------------------------------------------
 	//  Common Test Preparation
 	// ------------------------------------------------------------------------
@@ -134,8 +137,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	public void ensureNoJobIsLingering() throws Exception {
 		JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
 	}
-	
-	
+
 	// ------------------------------------------------------------------------
 	//  Suite of Tests
 	//
@@ -146,7 +148,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 	/**
 	 * Test that ensures the KafkaConsumer is properly failing if the topic doesnt exist
-	 * and a wrong broker was specified
+	 * and a wrong broker was specified.
 	 *
 	 * @throws Exception
 	 */
@@ -173,8 +175,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			DataStream<String> stream = see.addSource(source);
 			stream.print();
 			see.execute("No broker test");
-		} catch(JobExecutionException jee) {
-			if(kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) {
+		} catch (JobExecutionException jee) {
+			if (kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) {
 				assertTrue(jee.getCause() instanceof TimeoutException);
 
 				TimeoutException te = (TimeoutException) jee.getCause();
@@ -191,7 +193,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * Ensures that the committed offsets to Kafka are the offsets of "the next record to process"
+	 * Ensures that the committed offsets to Kafka are the offsets of "the next record to process".
 	 */
 	public void runCommitOffsetsToKafka() throws Exception {
 		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
@@ -354,13 +356,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * This test ensures that when the consumers retrieve some start offset from kafka (earliest, latest), that this offset
 	 * is committed to Kafka, even if some partitions are not read.
 	 *
-	 * Test:
+	 * <p>Test:
 	 * - Create 3 partitions
 	 * - write 50 messages into each.
 	 * - Start three consumers with auto.offset.reset='latest' and wait until they committed into Kafka.
 	 * - Check if the offsets in Kafka are set to 50 for the three partitions
 	 *
-	 * See FLINK-3440 as well
+	 * <p>See FLINK-3440 as well
 	 */
 	public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception {
 		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
@@ -518,7 +520,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		env
 			.addSource(latestReadingConsumer).setParallelism(parallelism)
-			.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Object>() {
+			.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Object>() {
 				@Override
 				public void flatMap(Tuple2<Integer, Integer> value, Collector<Object> out) throws Exception {
 					if (value.f1 - recordsInEachPartition < 0) {
@@ -605,12 +607,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * This test ensures that the consumer correctly uses group offsets in Kafka, and defaults to "auto.offset.reset"
 	 * behaviour when necessary, when explicitly configured to start from group offsets.
 	 *
-	 * The partitions and their committed group offsets are setup as:
+	 * <p>The partitions and their committed group offsets are setup as:
 	 * 	partition 0 --> committed offset 23
 	 * 	partition 1 --> no commit offset
 	 * 	partition 2 --> committed offset 43
 	 *
-	 * When configured to start from group offsets, each partition should read:
+	 * <p>When configured to start from group offsets, each partition should read:
 	 * 	partition 0 --> start from offset 23, read to offset 49 (27 records)
 	 * 	partition 1 --> default to "auto.offset.reset" (set to earliest), so start from offset 0, read to offset 49 (50 records)
 	 * 	partition 2 --> start from offset 43, read to offset 49 (7 records)
@@ -653,20 +655,20 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * start from specific offsets. For partitions which a specific offset can not be found for, the starting position
 	 * for them should fallback to the group offsets behaviour.
 	 *
-	 * 4 partitions will have 50 records with offsets 0 to 49. The supplied specific offsets map is:
+	 * <p>4 partitions will have 50 records with offsets 0 to 49. The supplied specific offsets map is:
 	 * 	partition 0 --> start from offset 19
 	 * 	partition 1 --> not set
 	 * 	partition 2 --> start from offset 22
 	 * 	partition 3 --> not set
 	 * 	partition 4 --> start from offset 26 (this should be ignored because the partition does not exist)
 	 *
-	 * The partitions and their committed group offsets are setup as:
+	 * <p>The partitions and their committed group offsets are setup as:
 	 * 	partition 0 --> committed offset 23
 	 * 	partition 1 --> committed offset 31
 	 * 	partition 2 --> committed offset 43
 	 * 	partition 3 --> no commit offset
 	 *
-	 * When configured to start from these specific offsets, each partition should read:
+	 * <p>When configured to start from these specific offsets, each partition should read:
 	 * 	partition 0 --> start from offset 19, read to offset 49 (31 records)
 	 * 	partition 1 --> fallback to group offsets, so start from offset 31, read to offset 49 (19 records)
 	 * 	partition 2 --> start from offset 22, read to offset 49 (28 records)
@@ -711,7 +713,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		kafkaOffsetHandler.close();
 		deleteTestTopic(topicName);
 	}
-	
+
 	/**
 	 * Ensure Kafka is working on both producer and consumer side.
 	 * This executes a job that contains two Flink pipelines.
@@ -719,22 +721,22 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * <pre>
 	 * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
 	 * </pre>
-	 * 
-	 * We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer
+	 *
+	 * <p>We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer
 	 * does not guarantee exactly-once output. Hence a recovery would introduce duplicates that
 	 * cause the test to fail.
 	 *
-	 * This test also ensures that FLINK-3156 doesn't happen again:
+	 * <p>This test also ensures that FLINK-3156 doesn't happen again:
 	 *
-	 * The following situation caused a NPE in the FlinkKafkaConsumer
+	 * <p>The following situation caused a NPE in the FlinkKafkaConsumer
 	 *
-	 * topic-1 <-- elements are only produced into topic1.
+	 * <p>topic-1 <-- elements are only produced into topic1.
 	 * topic-2
 	 *
-	 * Therefore, this test is consuming as well from an empty topic.
+	 * <p>Therefore, this test is consuming as well from an empty topic.
 	 *
 	 */
-	@RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class)
+	@RetryOnException(times = 2, exception = kafka.common.NotLeaderForPartitionException.class)
 	public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
 		final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
 		final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID().toString();
@@ -763,7 +765,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		// ----------- add producer dataflow ----------
 
-		DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
+		DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long, String>>() {
 
 			private boolean running = true;
 
@@ -772,7 +774,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 				int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
 				int limit = cnt + elementsPerPartition;
 
-
 				while (running && cnt < limit) {
 					ctx.collect(new Tuple2<>(1000L + cnt, "kafka-" + cnt));
 					cnt++;
@@ -1002,13 +1003,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		FailingIdentityMapper.failedBefore = false;
 		tryExecute(env, "multi-source-one-partitions exactly once test");
 
-
 		deleteTestTopic(topic);
 	}
-	
-	
+
 	/**
-	 * Tests that the source can be properly canceled when reading full partitions. 
+	 * Tests that the source can be properly canceled when reading full partitions.
 	 */
 	public void runCancelingOnFullInputTest() throws Exception {
 		final String topic = "cancelingOnFullTopic";
@@ -1056,7 +1055,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		Thread.sleep(2000);
 
 		Throwable failueCause = jobError.get();
-		if(failueCause != null) {
+		if (failueCause != null) {
 			failueCause.printStackTrace();
 			Assert.fail("Test failed prematurely with: " + failueCause.getMessage());
 		}
@@ -1089,7 +1088,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * Tests that the source can be properly canceled when reading empty partitions. 
+	 * Tests that the source can be properly canceled when reading empty partitions.
 	 */
 	public void runCancelingOnEmptyInputTest() throws Exception {
 		final String topic = "cancelingOnEmptyInputTopic";
@@ -1149,7 +1148,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * Tests that the source can be properly canceled when reading full partitions. 
+	 * Tests that the source can be properly canceled when reading full partitions.
 	 */
 	public void runFailOnDeployTest() throws Exception {
 		final String topic = "failOnDeployTopic";
@@ -1198,19 +1197,19 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * Test producing and consuming into multiple topics
+	 * Test producing and consuming into multiple topics.
 	 * @throws Exception
 	 */
 	public void runProduceConsumeMultipleTopics() throws Exception {
-		final int NUM_TOPICS = 5;
-		final int NUM_ELEMENTS = 20;
+		final int numTopics = 5;
+		final int numElements = 20;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.getConfig().disableSysoutLogging();
-		
+
 		// create topics with content
 		final List<String> topics = new ArrayList<>();
-		for (int i = 0; i < NUM_TOPICS; i++) {
+		for (int i = 0; i < numTopics; i++) {
 			final String topic = "topic-" + i;
 			topics.add(topic);
 			// create topic
@@ -1227,8 +1226,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			public void run(SourceContext<Tuple3<Integer, Integer, String>> ctx) throws Exception {
 				int partition = getRuntimeContext().getIndexOfThisSubtask();
 
-				for (int topicId = 0; topicId < NUM_TOPICS; topicId++) {
-					for (int i = 0; i < NUM_ELEMENTS; i++) {
+				for (int topicId = 0; topicId < numTopics; topicId++) {
+					for (int i = 0; i < numElements; i++) {
 						ctx.collect(new Tuple3<>(partition, i, "topic-" + topicId));
 					}
 				}
@@ -1255,7 +1254,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		stream = env.addSource(kafkaServer.getConsumer(topics, schema, props));
 
 		stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() {
-			Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS);
+			Map<String, Integer> countPerTopic = new HashMap<>(numTopics);
 			@Override
 			public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> out) throws Exception {
 				Integer count = countPerTopic.get(value.f2);
@@ -1268,10 +1267,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 				// check map:
 				for (Map.Entry<String, Integer> el: countPerTopic.entrySet()) {
-					if (el.getValue() < NUM_ELEMENTS) {
+					if (el.getValue() < numElements) {
 						break; // not enough yet
 					}
-					if (el.getValue() > NUM_ELEMENTS) {
+					if (el.getValue() > numElements) {
 						throw new RuntimeException("There is a failure in the test. I've read " +
 								el.getValue() + " from topic " + el.getKey());
 					}
@@ -1283,17 +1282,17 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		tryExecute(env, "Count elements from the topics");
 
-
 		// delete all topics again
-		for (int i = 0; i < NUM_TOPICS; i++) {
+		for (int i = 0; i < numTopics; i++) {
 			final String topic = "topic-" + i;
 			deleteTestTopic(topic);
 		}
 	}
 
 	/**
-	 * Test Flink's Kafka integration also with very big records (30MB)
-	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
+	 * Test Flink's Kafka integration also with very big records (30MB).
+	 *
+	 * <p>see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
 	 *
 	 */
 	public void runBigRecordTestTopology() throws Exception {
@@ -1337,11 +1336,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					if (elCnt == 11) {
 						throw new SuccessException();
 					} else {
-						throw new RuntimeException("There have been "+elCnt+" elements");
+						throw new RuntimeException("There have been " + elCnt + " elements");
 					}
 				}
 				if (elCnt > 10) {
-					throw new RuntimeException("More than 10 elements seen: "+elCnt);
+					throw new RuntimeException("More than 10 elements seen: " + elCnt);
 				}
 			}
 		});
@@ -1395,7 +1394,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		deleteTestTopic(topic);
 	}
 
-	
 	public void runBrokerFailureTest() throws Exception {
 		final String topic = "brokerFailureTestTopic";
 
@@ -1404,7 +1402,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		final int totalElements = parallelism * numElementsPerPartition;
 		final int failAfterElements = numElementsPerPartition / 3;
 
-
 		createTestTopic(topic, parallelism, 2);
 
 		DataGenerators.generateRandomizedIntegerSequence(
@@ -1417,7 +1414,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		LOG.info("Leader to shutdown {}", leaderId);
 
-
 		// run the topology (the consumers must handle the failures)
 
 		DeserializationSchema<Integer> schema =
@@ -1450,7 +1446,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	public void runKeyValueTest() throws Exception {
 		final String topic = "keyvaluetest";
 		createTestTopic(topic, 1, 1);
-		final int ELEMENT_COUNT = 5000;
+		final int elementCount = 5000;
 
 		// ----------- Write some data into Kafka -------------------
 
@@ -1463,7 +1459,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			@Override
 			public void run(SourceContext<Tuple2<Long, PojoValue>> ctx) throws Exception {
 				Random rnd = new Random(1337);
-				for (long i = 0; i < ELEMENT_COUNT; i++) {
+				for (long i = 0; i < elementCount; i++) {
 					PojoValue pojo = new PojoValue();
 					pojo.when = new Date(rnd.nextLong());
 					pojo.lon = rnd.nextLong();
@@ -1473,6 +1469,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					ctx.collect(new Tuple2<>(key, pojo));
 				}
 			}
+
 			@Override
 			public void cancel() {
 			}
@@ -1491,26 +1488,25 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env.setRestartStrategy(RestartStrategies.noRestart());
 		env.getConfig().disableSysoutLogging();
 
-
 		KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
 
 		Properties props = new Properties();
 		props.putAll(standardProps);
 		props.putAll(secureProps);
 		DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(kafkaServer.getConsumer(topic, readSchema, props));
-		fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() {
+		fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long, PojoValue>, Object>() {
 			long counter = 0;
 			@Override
 			public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out) throws Exception {
 				// the elements should be in order.
-				Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter );
+				Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter);
 				if (value.f1.lat % 2 == 0) {
 					assertNull("key was not null", value.f0);
 				} else {
 					Assert.assertTrue("Wrong value " + value.f0, value.f0 == counter);
 				}
 				counter++;
-				if (counter == ELEMENT_COUNT) {
+				if (counter == elementCount) {
 					// we got the right number of elements
 					throw new SuccessException();
 				}
@@ -1522,22 +1518,21 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		deleteTestTopic(topic);
 	}
 
-	public static class PojoValue {
+	private static class PojoValue {
 		public Date when;
 		public long lon;
 		public long lat;
 		public PojoValue() {}
 	}
 
-
 	/**
-	 * Test delete behavior and metrics for producer
+	 * Test delete behavior and metrics for producer.
 	 * @throws Exception
 	 */
 	public void runAllDeletesTest() throws Exception {
 		final String topic = "alldeletestest";
 		createTestTopic(topic, 1, 1);
-		final int ELEMENT_COUNT = 300;
+		final int elementCount = 300;
 
 		// ----------- Write some data into Kafka -------------------
 
@@ -1550,12 +1545,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			@Override
 			public void run(SourceContext<Tuple2<byte[], PojoValue>> ctx) throws Exception {
 				Random rnd = new Random(1337);
-				for (long i = 0; i < ELEMENT_COUNT; i++) {
+				for (long i = 0; i < elementCount; i++) {
 					final byte[] key = new byte[200];
 					rnd.nextBytes(key);
 					ctx.collect(new Tuple2<>(key, (PojoValue) null));
 				}
 			}
+
 			@Override
 			public void cancel() {
 			}
@@ -1589,7 +1585,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 				// ensure that deleted messages are passed as nulls
 				assertNull(value.f1);
 				counter++;
-				if (counter == ELEMENT_COUNT) {
+				if (counter == elementCount) {
 					// we got the right number of elements
 					throw new SuccessException();
 				}
@@ -1608,8 +1604,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 */
 	public void runEndOfStreamTest() throws Exception {
 
-		final int ELEMENT_COUNT = 300;
-		final String topic = writeSequence("testEndOfStream", ELEMENT_COUNT, 1, 1);
+		final int elementCount = 300;
+		final String topic = writeSequence("testEndOfStream", elementCount, 1, 1);
 
 		// read using custom schema
 		final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -1621,21 +1617,21 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		props.putAll(standardProps);
 		props.putAll(secureProps);
 
-		DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(ELEMENT_COUNT), props));
-		fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Void>() {
+		DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, new FixedNumberDeserializationSchema(elementCount), props));
+		fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
 			@Override
 			public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {
 				// noop ;)
 			}
 		});
 
-		JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka");
+		JobExecutionResult result = tryExecute(env1, "Consume " + elementCount + " elements from Kafka");
 
 		deleteTestTopic(topic);
 	}
 
 	/**
-	 * Test metrics reporting for consumer
+	 * Test metrics reporting for consumer.
 	 *
 	 * @throws Exception
 	 */
@@ -1690,9 +1686,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
 
 					env1.execute("Metrics test job");
-				} catch(Throwable t) {
+				} catch (Throwable t) {
 					LOG.warn("Got exception during execution", t);
-					if(!(t instanceof JobCancellationException)) { // we'll cancel the job
+					if (!(t instanceof JobCancellationException)) { // we'll cancel the job
 						error.f0 = t;
 					}
 				}
@@ -1723,7 +1719,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 				// check that offsets are correctly reported
 				for (ObjectName object : offsetMetrics) {
 					Object offset = mBeanServer.getAttribute(object, "Value");
-					if((long) offset >= 0) {
+					if ((long) offset >= 0) {
 						numPosOffsets++;
 					}
 				}
@@ -1738,7 +1734,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			Set<ObjectName> producerMetrics = mBeanServer.queryNames(new ObjectName("*KafkaProducer*:*"), null);
 			Assert.assertTrue("No producer metrics found", producerMetrics.size() > 30);
 
-
 			LOG.info("Found all JMX metrics. Cancelling job.");
 		} finally {
 			// cancel
@@ -1755,12 +1750,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		deleteTestTopic(topic);
 	}
 
+	private static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> {
 
-	public static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer, Integer>> {
-		
 		final int finalCount;
 		int count = 0;
-		
+
 		TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse("Tuple2<Integer, Integer>");
 		TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig());
 
@@ -1785,7 +1779,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 	}
 
-
 	// ------------------------------------------------------------------------
 	//  Reading writing test data sets
 	// ------------------------------------------------------------------------
@@ -1916,13 +1909,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			String baseTopicName,
 			final int numElements,
 			final int parallelism,
-			final int replicationFactor) throws Exception
-	{
+			final int replicationFactor) throws Exception {
 		LOG.info("\n===================================\n" +
 				"== Writing sequence of " + numElements + " into " + baseTopicName + " with p=" + parallelism + "\n" +
 				"===================================");
 
-		final TypeInformation<Tuple2<Integer, Integer>> resultType = 
+		final TypeInformation<Tuple2<Integer, Integer>> resultType =
 				TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
 
 		final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
@@ -1932,15 +1924,15 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
 				new KeyedDeserializationSchemaWrapper<>(
 						new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
-		
+
 		final int maxNumAttempts = 10;
 
 		for (int attempt = 1; attempt <= maxNumAttempts; attempt++) {
-			
+
 			final String topicName = baseTopicName + '-' + attempt;
-			
+
 			LOG.info("Writing attempt #1");
-			
+
 			// -------- Write the Sequence --------
 
 			createTestTopic(topicName, parallelism, replicationFactor);
@@ -1948,33 +1940,33 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			StreamExecutionEnvironment writeEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 			writeEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 			writeEnv.getConfig().disableSysoutLogging();
-			
+
 			DataStream<Tuple2<Integer, Integer>> stream = writeEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-	
+
 				private boolean running = true;
-	
+
 				@Override
 				public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
 					int cnt = 0;
 					int partition = getRuntimeContext().getIndexOfThisSubtask();
-	
+
 					while (running && cnt < numElements) {
 						ctx.collect(new Tuple2<>(partition, cnt));
 						cnt++;
 					}
 				}
-	
+
 				@Override
 				public void cancel() {
 					running = false;
 				}
 			}).setParallelism(parallelism);
-	
+
 			// the producer must not produce duplicates
 			Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
 			producerProperties.setProperty("retries", "0");
 			producerProperties.putAll(secureProps);
-			
+
 			kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2FlinkPartitioner(parallelism))
 					.setParallelism(parallelism);
 
@@ -1987,21 +1979,21 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 				JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
 				continue;
 			}
-			
+
 			LOG.info("Finished writing sequence");
 
 			// -------- Validate the Sequence --------
-			
+
 			// we need to validate the sequence, because kafka's producers are not exactly once
 			LOG.info("Validating sequence");
 
 			JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
-			
+
 			final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 			readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 			readEnv.getConfig().disableSysoutLogging();
 			readEnv.setParallelism(parallelism);
-			
+
 			Properties readProps = (Properties) standardProps.clone();
 			readProps.setProperty("group.id", "flink-tests-validator");
 			readProps.putAll(secureProps);
@@ -2010,10 +2002,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			readEnv
 					.addSource(consumer)
 					.map(new RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
-						
+
 						private final int totalCount = parallelism * numElements;
 						private int count = 0;
-						
+
 						@Override
 						public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
 							if (++count == totalCount) {
@@ -2024,9 +2016,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 						}
 					}).setParallelism(1)
 					.addSink(new DiscardingSink<Tuple2<Integer, Integer>>()).setParallelism(1);
-			
+
 			final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-			
+
 			Thread runner = new Thread() {
 				@Override
 				public void run() {
@@ -2038,15 +2030,15 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 				}
 			};
 			runner.start();
-			
+
 			final long deadline = System.nanoTime() + 10_000_000_000L;
 			long delay;
 			while (runner.isAlive() && (delay = deadline - System.nanoTime()) > 0) {
-				runner.join(delay/1_000_000L);
+				runner.join(delay / 1_000_000L);
 			}
-			
+
 			boolean success;
-			
+
 			if (runner.isAlive()) {
 				// did not finish in time, maybe the producer dropped one or more records and
 				// the validation did not reach the exit point
@@ -2065,7 +2057,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}
 
 			JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
-			
+
 			if (success) {
 				// everything is good!
 				return topicName;
@@ -2075,7 +2067,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 				// fall through the loop
 			}
 		}
-		
+
 		throw new Exception("Could not write a valid sequence to Kafka after " + maxNumAttempts + " attempts");
 	}
 
@@ -2090,28 +2082,28 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
 		// we request only one stream per consumer instance. Kafka will make sure that each consumer group
 		// will see each message only once.
-		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+		Map<String, Integer> topicCountMap = Collections.singletonMap(topicName, 1);
 		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
 		if (streams.size() != 1) {
-			throw new RuntimeException("Expected only one message stream but got "+streams.size());
+			throw new RuntimeException("Expected only one message stream but got " + streams.size());
 		}
 		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
 		if (kafkaStreams == null) {
-			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+			throw new RuntimeException("Requested stream not available. Available streams: " + streams.toString());
 		}
 		if (kafkaStreams.size() != 1) {
-			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
+			throw new RuntimeException("Requested 1 stream from Kafka, bot got " + kafkaStreams.size() + " streams");
 		}
 		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
 		ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
 
 		List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>();
 		int read = 0;
-		while(iteratorToRead.hasNext()) {
+		while (iteratorToRead.hasNext()) {
 			read++;
 			result.add(iteratorToRead.next());
 			if (read == stopAfter) {
-				LOG.info("Read "+read+" elements");
+				LOG.info("Read " + read + " elements");
 				return result;
 			}
 		}
@@ -2131,12 +2123,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 	}
 
-	private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) 
-			throws IOException
-	{
+	private static void printTopic(String topicName, int elements, DeserializationSchema<?> deserializer)
+			throws IOException {
 		// write the sequence to log for debugging purposes
 		Properties newProps = new Properties(standardProps);
-		newProps.setProperty("group.id", "topic-printer"+ UUID.randomUUID().toString());
+		newProps.setProperty("group.id", "topic-printer" + UUID.randomUUID().toString());
 		newProps.setProperty("auto.offset.reset", "smallest");
 		newProps.setProperty("zookeeper.connect", standardProps.getProperty("zookeeper.connect"));
 		newProps.putAll(secureProps);
@@ -2145,15 +2136,14 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		printTopic(topicName, printerConfig, deserializer, elements);
 	}
 
-
-	public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
+	private static class BrokerKillingMapper<T> extends RichMapFunction<T, T>
 			implements ListCheckpointed<Integer>, CheckpointListener {
 
 		private static final long serialVersionUID = 6334389850158707313L;
 
 		public static volatile boolean killedLeaderBefore;
 		public static volatile boolean hasBeenCheckpointedBeforeFailure;
-		
+
 		private final int shutdownBrokerId;
 		private final int failCount;
 		private int numElementsTotal;
@@ -2174,10 +2164,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		@Override
 		public T map(T value) throws Exception {
 			numElementsTotal++;
-			
+
 			if (!killedLeaderBefore) {
 				Thread.sleep(10);
-				
+
 				if (failer && numElementsTotal >= failCount) {
 					// shut down a Kafka broker
 					KafkaServer toShutDown = null;
@@ -2188,14 +2178,14 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 							break;
 						}
 					}
-	
+
 					if (toShutDown == null) {
 						StringBuilder listOfBrokers = new StringBuilder();
 						for (KafkaServer server : kafkaServer.getBrokers()) {
 							listOfBrokers.append(kafkaServer.getBrokerId(server));
 							listOfBrokers.append(" ; ");
 						}
-						
+
 						throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId
 								+ " ; available brokers: " + listOfBrokers.toString());
 					}
@@ -2266,7 +2256,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			try {
 				ts.serialize(new Tuple2<>(element.f0, element.f1), out);
 			} catch (IOException e) {
-				throw new RuntimeException("Error" ,e);
+				throw new RuntimeException("Error" , e);
 			}
 			return by.toByteArray();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index bcc8328..e292e13 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -43,6 +43,9 @@ import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+/**
+ * Abstract test base for all Kafka producer tests.
+ */
 @SuppressWarnings("serial")
 public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
@@ -50,7 +53,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 	 * This tests verifies that custom partitioning works correctly, with a default topic
 	 * and dynamic topic. The number of partitions for each topic is deliberately different.
 	 *
-	 * Test topology:
+	 * <p>Test topology:
 	 *
 	 * <pre>
 	 *             +------> (sink) --+--> [DEFAULT_TOPIC-1] --> (source) -> (map) -----+
@@ -67,11 +70,11 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 	 *            \                  |                             |          |        |
 	 *             +------> (sink) --+--> [DYNAMIC_TOPIC-3] --> (source) -> (map) -----+
 	 * </pre>
-	 * 
-	 * Each topic has an independent mapper that validates the values come consistently from
+	 *
+	 * <p>Each topic has an independent mapper that validates the values come consistently from
 	 * the correct Kafka partition of the topic is is responsible of.
-	 * 
-	 * Each topic also has a final sink that validates that there are no duplicates and that all
+	 *
+	 * <p>Each topic also has a final sink that validates that there are no duplicates and that all
 	 * partitions are present.
 	 */
 	public void runCustomPartitioningTest() {
@@ -171,7 +174,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
 	// ------------------------------------------------------------------------
 
-	public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
+	private static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
 
 		private final Map<String, Integer> expectedTopicsToNumPartitions;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index f688660..d5c9276 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -37,7 +37,6 @@ import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -58,7 +57,7 @@ import static org.junit.Assert.fail;
  */
 @SuppressWarnings("serial")
 public class KafkaShortRetentionTestBase implements Serializable {
-	
+
 	protected static final Logger LOG = LoggerFactory.getLogger(KafkaShortRetentionTestBase.class);
 
 	protected static final int NUM_TMS = 1;
@@ -66,7 +65,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 	protected static final int TM_SLOTS = 8;
 
 	protected static final int PARALLELISM = NUM_TMS * TM_SLOTS;
-	
+
 	private static KafkaTestEnvironment kafkaServer;
 	private static Properties standardProps;
 	private static LocalFlinkMiniCluster flink;
@@ -90,7 +89,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 
 		LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion());
 
-		if(kafkaServer.isSecureRunSupported()) {
+		if (kafkaServer.isSecureRunSupported()) {
 			secureProps = kafkaServer.getSecureProperties();
 		}
 
@@ -151,10 +150,8 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
 		env.getConfig().disableSysoutLogging();
 
-
 		// ----------- add producer dataflow ----------
 
-
 		DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() {
 
 			private boolean running = true;
@@ -164,7 +161,6 @@ public class KafkaShortRetentionTestBase implements Serializable {
 				int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
 				int limit = cnt + elementsPerPartition;
 
-
 				while (running && !stopProducer && cnt < limit) {
 					ctx.collect("element-" + cnt);
 					cnt++;
@@ -196,14 +192,13 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		kafkaServer.deleteTestTopic(topic);
 	}
 
-	
 	private class NonContinousOffsetsDeserializationSchema implements KeyedDeserializationSchema<String> {
 		private int numJumps;
 		long nextExpected = 0;
 
 		@Override
 		public String deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
-			if(offset != nextExpected) {
+			if (offset != nextExpected) {
 				numJumps++;
 				nextExpected = offset;
 				LOG.info("Registered now jump at offset {}", offset);
@@ -219,7 +214,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 
 		@Override
 		public boolean isEndOfStream(String nextElement) {
-			if( numJumps >= 5) {
+			if (numJumps >= 5) {
 				// we saw 5 jumps and no failures --> consumer can handle auto.offset.reset
 				stopProducer = true;
 				return true;
@@ -233,15 +228,14 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		}
 	}
 
-
 	/**
-	 * Ensure that the consumer is properly failing if "auto.offset.reset" is set to "none"
+	 * Ensure that the consumer is properly failing if "auto.offset.reset" is set to "none".
 	 * @throws Exception
 	 */
 	public void runFailOnAutoOffsetResetNone() throws Exception {
 		final String topic = "auto-offset-reset-none-test";
 		final int parallelism = 1;
-		
+
 		kafkaServer.createTestTopic(topic, parallelism, 1);
 
 		final StreamExecutionEnvironment env =
@@ -249,7 +243,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		env.setParallelism(parallelism);
 		env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
 		env.getConfig().disableSysoutLogging();
-		
+
 		// ----------- add consumer ----------
 
 		Properties customProps = new Properties();
@@ -263,10 +257,10 @@ public class KafkaShortRetentionTestBase implements Serializable {
 
 		try {
 			env.execute("Test auto offset reset none");
-		} catch(Throwable e) {
+		} catch (Throwable e) {
 			// check if correct exception has been thrown
-			if(!e.getCause().getCause().getMessage().contains("Unable to find previous offset")  // kafka 0.8
-			 && !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9
+			if (!e.getCause().getCause().getMessage().contains("Unable to find previous offset")  // kafka 0.8
+				&& !e.getCause().getCause().getMessage().contains("Undefined offset with no reset policy for partition") // kafka 0.9
 					) {
 				throw e;
 			}
@@ -287,7 +281,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		customProps.putAll(standardProps);
 		customProps.putAll(secureProps);
 		customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception
-		
+
 		try {
 			kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps);
 			fail("should fail with an exception");

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index d4fe9cc..dcf3167 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -15,9 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.Properties;
+package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -27,8 +26,11 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.types.Row;
+
 import org.junit.Test;
 
+import java.util.Properties;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
@@ -38,6 +40,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Abstract test base for all Kafka table sink tests.
+ */
 public abstract class KafkaTableSinkTestBase {
 
 	private static final String TOPIC = "testTopic";
@@ -46,7 +51,7 @@ public abstract class KafkaTableSinkTestBase {
 	private static final FlinkKafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
 	private static final Properties PROPERTIES = createSinkProperties();
 	@SuppressWarnings("unchecked")
-	private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(
+	private final FlinkKafkaProducerBase<Row> producer = new FlinkKafkaProducerBase<Row>(
 		TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) {
 
 		@Override
@@ -61,7 +66,7 @@ public abstract class KafkaTableSinkTestBase {
 		KafkaTableSink kafkaTableSink = spy(createTableSink());
 		kafkaTableSink.emitDataStream(dataStream);
 
-		verify(dataStream).addSink(eq(PRODUCER));
+		verify(dataStream).addSink(eq(producer));
 
 		verify(kafkaTableSink).createKafkaProducer(
 			eq(TOPIC),
@@ -87,7 +92,7 @@ public abstract class KafkaTableSinkTestBase {
 	protected abstract SerializationSchema<Row> getSerializationSchema();
 
 	private KafkaTableSink createTableSink() {
-		return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
+		return createTableSink(TOPIC, PROPERTIES, PARTITIONER, producer);
 	}
 
 	private static Properties createSinkProperties() {

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index 341df45..8028bfc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -18,23 +18,29 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.Properties;
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.types.Row;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificRecordBase;
 import org.junit.Test;
+
+import java.util.Properties;
+
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Abstract test base for all Kafka table sources.
+ */
 public abstract class KafkaTableSourceTestBase {
 
 	private static final String TOPIC = "testTopic";
@@ -47,10 +53,14 @@ public abstract class KafkaTableSourceTestBase {
 		BasicTypeInfo.LONG_TYPE_INFO };
 	private static final Properties PROPERTIES = createSourceProperties();
 
-	// Avro record that matches above schema
+	/**
+	 * Avro record that matches above schema.
+	 */
 	public static class AvroSpecificRecord extends SpecificRecordBase {
 
+		//CHECKSTYLE.OFF: StaticVariableNameCheck - Avro accesses this field by name via reflection.
 		public static Schema SCHEMA$ = AvroTestUtils.createFlatAvroSchema(FIELD_NAMES, FIELD_TYPES);
+		//CHECKSTYLE.ON: StaticVariableNameCheck
 
 		public Long mylong;
 		public String mystring;

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 1837af6..c484a4b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -33,17 +33,15 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * The base for the Kafka tests. It brings up:
@@ -52,7 +50,7 @@ import java.util.concurrent.TimeUnit;
  *     <li>Three Kafka Brokers (mini clusters)</li>
  *     <li>A Flink mini cluster</li>
  * </ul>
- * 
+ *
  * <p>Code in this test is based on the following GitHub repository:
  * <a href="https://github.com/sakserv/hadoop-mini-clusters">
  *   https://github.com/sakserv/hadoop-mini-clusters</a> (ASL licensed),
@@ -62,7 +60,7 @@ import java.util.concurrent.TimeUnit;
 public abstract class KafkaTestBase extends TestLogger {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
-	
+
 	protected static final int NUMBER_OF_KAFKA_SERVERS = 3;
 
 	protected static final int NUM_TMS = 1;
@@ -74,7 +72,7 @@ public abstract class KafkaTestBase extends TestLogger {
 	protected static String brokerConnectionStrings;
 
 	protected static Properties standardProps;
-	
+
 	protected static LocalFlinkMiniCluster flink;
 
 	protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
@@ -89,7 +87,7 @@ public abstract class KafkaTestBase extends TestLogger {
 	// ------------------------------------------------------------------------
 	//  Setup and teardown of the mini clusters
 	// ------------------------------------------------------------------------
-	
+
 	@BeforeClass
 	public static void prepare() throws ClassNotFoundException {
 
@@ -162,7 +160,7 @@ public abstract class KafkaTestBase extends TestLogger {
 			flink.shutdown();
 		}
 
-		if(secureProps != null) {
+		if (secureProps != null) {
 			secureProps.clear();
 		}
 
@@ -170,12 +168,9 @@ public abstract class KafkaTestBase extends TestLogger {
 
 	}
 
-
-
 	// ------------------------------------------------------------------------
 	//  Execution utilities
 	// ------------------------------------------------------------------------
-	
 
 	protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception {
 		try {
@@ -200,7 +195,7 @@ public abstract class KafkaTestBase extends TestLogger {
 	protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
 		kafkaServer.createTestTopic(topic, numberOfPartitions, replicationFactor);
 	}
-	
+
 	protected static void deleteTestTopic(String topic) {
 		kafkaServer.deleteTestTopic(topic);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 311a1a4..4df3465 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -17,11 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import kafka.server.KafkaServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
@@ -31,8 +26,14 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 
+import kafka.server.KafkaServer;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
 /**
- * Abstract class providing a Kafka test environment
+ * Abstract class providing a Kafka test environment.
  */
 public abstract class KafkaTestEnvironment {
 
@@ -89,9 +90,14 @@ public abstract class KafkaTestEnvironment {
 
 	// -- offset handlers
 
+	/**
+	 * Simple interface to commit and retrieve offsets.
+	 */
 	public interface KafkaOffsetHandler {
 		Long getCommittedOffset(String topicName, int partition);
+
 		void setCommittedOffset(String topicName, int partition, long offset);
+
 		void close();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
deleted file mode 100644
index fa84199..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestFlinkFixedPartitioner {
-
-
-	/**
-	 * <pre>
-	 *   		Flink Sinks:		Kafka Partitions
-	 * 			1	---------------->	1
-	 * 			2   --------------/
-	 * 			3   -------------/
-	 * 			4	------------/
-	 * </pre>
-	 */
-	@Test
-	public void testMoreFlinkThanBrokers() {
-		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
-
-		int[] partitions = new int[]{0};
-
-		part.open(0, 4);
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
-		part.open(1, 4);
-		Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
-
-		part.open(2, 4);
-		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
-		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
-
-		part.open(3, 4);
-		Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
-	}
-
-	/**
-	 *
-	 * <pre>
-	 * 		Flink Sinks:		Kafka Partitions
-	 * 			1	---------------->	1
-	 * 			2	---------------->	2
-	 * 									3
-	 * 									4
-	 * 									5
-	 *
-	 * </pre>
-	 */
-	@Test
-	public void testFewerPartitions() {
-		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
-
-		int[] partitions = new int[]{0, 1, 2, 3, 4};
-		part.open(0, 2);
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
-		part.open(1, 2);
-		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
-		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
-	}
-
-	/*
-	 * 		Flink Sinks:		Kafka Partitions
-	 * 			1	------------>--->	1
-	 * 			2	-----------/----> 	2
-	 * 			3	----------/
-	 */
-	@Test
-	public void testMixedCase() {
-		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
-		int[] partitions = new int[]{0,1};
-
-		part.open(0, 3);
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
-		part.open(1, 3);
-		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
-
-		part.open(2, 3);
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
index c1a64c4..c83a97e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
@@ -23,13 +23,14 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
 
 import javax.annotation.Nullable;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -38,6 +39,9 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for the {@link AbstractFetcher}.
+ */
 @SuppressWarnings("serial")
 public class AbstractFetcherTest {
 
@@ -191,7 +195,7 @@ public class AbstractFetcherTest {
 		final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitionStates()[2];
 
 		// elements generate a watermark if the timestamp is a multiple of three
-		
+
 		// elements for partition 1
 		fetcher.emitRecord(1L, part1, 1L);
 		fetcher.emitRecord(2L, part1, 2L);
@@ -211,11 +215,11 @@ public class AbstractFetcherTest {
 		fetcher.emitRecord(102L, part3, 2L);
 		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
 		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
-		
+
 		// now, we should have a watermark
 		assertTrue(sourceContext.hasWatermark());
 		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
-		
+
 		// advance partition 3
 		fetcher.emitRecord(1003L, part3, 3L);
 		fetcher.emitRecord(1004L, part3, 4L);
@@ -239,7 +243,7 @@ public class AbstractFetcherTest {
 		assertTrue(sourceContext.hasWatermark());
 		assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
 	}
-	
+
 	@Test
 	public void testPeriodicWatermarks() throws Exception {
 		final String testTopic = "test topic name";
@@ -329,8 +333,7 @@ public class AbstractFetcherTest {
 				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
 				ProcessingTimeService processingTimeProvider,
-				long autoWatermarkInterval) throws Exception
-		{
+				long autoWatermarkInterval) throws Exception {
 			super(
 				sourceContext,
 				assignedPartitionsWithStartOffsets,
@@ -391,7 +394,6 @@ public class AbstractFetcherTest {
 			}
 		}
 
-
 		@Override
 		public void markAsTemporarilyIdle() {
 			throw new UnsupportedOperationException();
@@ -412,7 +414,7 @@ public class AbstractFetcherTest {
 		public boolean hasWatermark() {
 			return currentWatermark != null;
 		}
-		
+
 		public Watermark getLatestWatermark() throws InterruptedException {
 			synchronized (watermarkLock) {
 				while (currentWatermark == null) {
@@ -430,7 +432,7 @@ public class AbstractFetcherTest {
 	private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long> {
 
 		private volatile long maxTimestamp = Long.MIN_VALUE;
-		
+
 		@Override
 		public long extractTimestamp(Long element, long previousElementTimestamp) {
 			maxTimestamp = Math.max(maxTimestamp, element);
@@ -456,6 +458,6 @@ public class AbstractFetcherTest {
 		public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
 			return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null;
 		}
-		
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
index b215bd3..4496a26 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java
@@ -27,8 +27,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for the {@link KafkaTopicPartition}.
+ */
 public class KafkaTopicPartitionTest {
-	
+
 	@Test
 	public void validateUid() {
 		Field uidField;
@@ -40,14 +43,14 @@ public class KafkaTopicPartitionTest {
 			fail("serialVersionUID is not defined");
 			return;
 		}
-		
+
 		assertTrue(Modifier.isStatic(uidField.getModifiers()));
 		assertTrue(Modifier.isFinal(uidField.getModifiers()));
 		assertTrue(Modifier.isPrivate(uidField.getModifiers()));
-		
+
 		assertEquals(long.class, uidField.getType());
-		
-		// the UID has to be constant to make sure old checkpoints/savepoints can be read 
+
+		// the UID has to be constant to make sure old checkpoints/savepoints can be read
 		try {
 			assertEquals(722083576322742325L, uidField.getLong(null));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
index 075b79b..a41125a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
@@ -18,13 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.specific.SpecificRecord;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.io.avro.generated.Address;
 import org.apache.flink.api.io.avro.generated.Colors;
@@ -32,12 +25,21 @@ import org.apache.flink.api.io.avro.generated.User;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.types.Row;
 
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificRecord;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
 /**
  * Utilities for creating Avro Schemas.
  */
 public final class AvroTestUtils {
 
-	private static String NAMESPACE = "org.apache.flink.streaming.connectors.kafka";
+	private static final String NAMESPACE = "org.apache.flink.streaming.connectors.kafka";
 
 	/**
 	 * Creates a flat Avro Schema for testing.

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index c0fb836..b204ea9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
-import java.util.Collection;
-import java.util.Properties;
-import java.util.Random;
-
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -43,14 +39,22 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * Test data generators.
+ */
 @SuppressWarnings("serial")
 public class DataGenerators {
 
-	public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
-														 KafkaTestEnvironment testServer, String topic,
-														 final int numPartitions,
-														 final int numElements,
-														 final boolean randomizeOrder) throws Exception {
+	public static void generateRandomizedIntegerSequence(
+			StreamExecutionEnvironment env,
+			KafkaTestEnvironment testServer, String topic,
+			final int numPartitions,
+			final int numElements,
+			final boolean randomizeOrder) throws Exception {
 		env.setParallelism(numPartitions);
 		env.getConfig().disableSysoutLogging();
 		env.setRestartStrategy(RestartStrategies.noRestart());
@@ -65,8 +69,8 @@ public class DataGenerators {
 						// create a sequence
 						int[] elements = new int[numElements];
 						for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
-							 i < numElements;
-							 i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
+							i < numElements;
+							i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
 
 							elements[i] = val;
 						}
@@ -99,7 +103,7 @@ public class DataGenerators {
 		Properties props = new Properties();
 		props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()));
 		Properties secureProps = testServer.getSecureProperties();
-		if(secureProps != null) {
+		if (secureProps != null) {
 			props.putAll(testServer.getSecureProperties());
 		}
 
@@ -119,6 +123,10 @@ public class DataGenerators {
 
 	// ------------------------------------------------------------------------
 
+	/**
+	 * A generator that continuously writes strings into the configured topic. The generation is stopped if an exception
+	 * occurs or {@link #shutdown()} is called.
+	 */
 	public static class InfiniteStringsGenerator extends Thread {
 
 		private final KafkaTestEnvironment server;
@@ -129,7 +137,6 @@ public class DataGenerators {
 
 		private volatile boolean running = true;
 
-
 		public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) {
 			this.server = server;
 			this.topic = topic;
@@ -164,7 +171,7 @@ public class DataGenerators {
 
 					int len = rnd.nextInt(100) + 1;
 					for (int i = 0; i < len; i++) {
-						bld.append((char) (rnd.nextInt(20) + 'a') );
+						bld.append((char) (rnd.nextInt(20) + 'a'));
 					}
 
 					String next = bld.toString();
@@ -211,7 +218,7 @@ public class DataGenerators {
 			}
 		}
 
-		public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+		private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
 			@Override
 			public JobExecutionResult execute(String jobName) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
index ec64b00..c25eefb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
@@ -22,30 +22,35 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
 
-
-public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
+/**
+ * A {@link RichMapFunction} that fails after the configured number of records have been processed.
+ *
+ * @param <T>
+ */
+public class FailingIdentityMapper<T> extends RichMapFunction<T, T> implements
 	ListCheckpointed<Integer>, CheckpointListener, Runnable {
-	
+
 	private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
-	
+
 	private static final long serialVersionUID = 6334389850158707313L;
-	
+
 	public static volatile boolean failedBefore;
 	public static volatile boolean hasBeenCheckpointedBeforeFailure;
 
 	private final int failCount;
 	private int numElementsTotal;
 	private int numElementsThisTime;
-	
+
 	private boolean failer;
 	private boolean hasBeenCheckpointed;
-	
+
 	private Thread printer;
 	private volatile boolean printerRunning = true;
 
@@ -64,10 +69,10 @@ public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
 	public T map(T value) throws Exception {
 		numElementsTotal++;
 		numElementsThisTime++;
-		
+
 		if (!failedBefore) {
 			Thread.sleep(10);
-			
+
 			if (failer && numElementsTotal >= failCount) {
 				hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
 				failedBefore = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
index 055326d..0fbe554 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
@@ -15,12 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 
 import java.util.Properties;
 
+/**
+ * Test configuration for a kafka producer.
+ */
 public class FakeStandardProducerConfig {
 
 	public static Properties get() {

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
index 131325f..9bbe1d3 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
@@ -23,13 +23,16 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
+/**
+ * Utilities for communicating with a jobmanager through a {@link ActorGateway}.
+ */
 public class JobManagerCommunicationUtils {
 
 	private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
@@ -43,7 +46,6 @@ public class JobManagerCommunicationUtils {
 			Object result = Await.result(listResponse, askTimeout);
 			List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
 
-
 			if (jobs.isEmpty()) {
 				return;
 			}
@@ -84,13 +86,13 @@ public class JobManagerCommunicationUtils {
 
 	public static void cancelCurrentJob(ActorGateway jobManager, String name) throws Exception {
 		JobStatusMessage status = null;
-		
+
 		for (int i = 0; i < 200; i++) {
 			// find the jobID
 			Future<Object> listResponse = jobManager.ask(
 					JobManagerMessages.getRequestRunningJobsStatus(),
 					askTimeout);
-	
+
 			List<JobStatusMessage> jobs;
 			try {
 				Object result = Await.result(listResponse, askTimeout);
@@ -99,7 +101,7 @@ public class JobManagerCommunicationUtils {
 			catch (Exception e) {
 				throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
 			}
-		
+
 			if (jobs.isEmpty()) {
 				// try again, fall through the loop
 				Thread.sleep(50);
@@ -107,33 +109,33 @@ public class JobManagerCommunicationUtils {
 			else if (jobs.size() == 1) {
 				status = jobs.get(0);
 			}
-			else if(name != null) {
-				for(JobStatusMessage msg: jobs) {
-					if(msg.getJobName().equals(name)) {
+			else if (name != null) {
+				for (JobStatusMessage msg: jobs) {
+					if (msg.getJobName().equals(name)) {
 						status = msg;
 					}
 				}
-				if(status == null) {
-					throw new Exception("Could not cancel job - no job matched expected name = '" + name +"' in " + jobs);
+				if (status == null) {
+					throw new Exception("Could not cancel job - no job matched expected name = '" + name + "' in " + jobs);
 				}
 			} else {
 				String jobNames = "";
-				for(JobStatusMessage jsm: jobs) {
+				for (JobStatusMessage jsm: jobs) {
 					jobNames += jsm.getJobName() + ", ";
 				}
 				throw new Exception("Could not cancel job - more than one running job: " + jobNames);
 			}
 		}
-		
+
 		if (status == null) {
-			throw new Exception("Could not cancel job - no running jobs");	
+			throw new Exception("Could not cancel job - no running jobs");
 		}
 		else if (status.getJobState().isGloballyTerminalState()) {
 			throw new Exception("Could not cancel job - job is not running any more");
 		}
-		
+
 		JobID jobId = status.getJobId();
-		
+
 		Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout);
 		try {
 			Await.result(response, askTimeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
index e105e01..29e469d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/PartitionValidatingMapper.java
@@ -23,14 +23,16 @@ import org.apache.flink.api.common.functions.MapFunction;
 import java.util.HashSet;
 import java.util.Set;
 
-
+/**
+ * {@link MapFunction} that verifies that he partitioning is identical.
+ */
 public class PartitionValidatingMapper implements MapFunction<Integer, Integer> {
 
 	private static final long serialVersionUID = 1088381231244959088L;
-	
+
 	/* the partitions from which this function received data */
 	private final Set<Integer> myPartitions = new HashSet<>();
-	
+
 	private final int numPartitions;
 	private final int maxPartitions;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
index 1d61229..040f15c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ThrottledMapper.java
@@ -23,10 +23,10 @@ import org.apache.flink.api.common.functions.MapFunction;
 /**
  * An identity map function that sleeps between elements, throttling the
  * processing speed.
- * 
+ *
  * @param <T> The type mapped.
  */
-public class ThrottledMapper<T> implements MapFunction<T,T> {
+public class ThrottledMapper<T> implements MapFunction<T, T> {
 
 	private static final long serialVersionUID = 467008933767159126L;
 
@@ -41,4 +41,4 @@ public class ThrottledMapper<T> implements MapFunction<T,T> {
 		Thread.sleep(this.sleep);
 		return value;
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
index e7fff52..6f2c4a1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
@@ -33,13 +33,13 @@ public class Tuple2FlinkPartitioner extends FlinkKafkaPartitioner<Tuple2<Integer
 	public Tuple2FlinkPartitioner(int expectedPartitions) {
 		this.expectedPartitions = expectedPartitions;
 	}
-	
+
 	@Override
 	public int partition(Tuple2<Integer, Integer> next, byte[] key, byte[] value, String targetTopic, int[] partitions) {
 		if (partitions.length != expectedPartitions) {
 			throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
 		}
-		
+
 		return next.f0;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
index 46e70fd..5ace012 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.test.util.SuccessException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,14 +30,17 @@ import java.util.BitSet;
 import java.util.Collections;
 import java.util.List;
 
+/**
+ * A {@link RichSinkFunction} that verifies that no duplicate records are generated.
+ */
 public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements ListCheckpointed<Tuple2<Integer, BitSet>> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
 
 	private static final long serialVersionUID = 1748426382527469932L;
-	
+
 	private final int numElementsTotal;
-	
+
 	private BitSet duplicateChecker = new BitSet();  // this is checkpointed
 
 	private int numElements; // this is checkpointed
@@ -45,11 +49,10 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> impleme
 		this.numElementsTotal = numElementsTotal;
 	}
 
-	
 	@Override
 	public void invoke(Integer value) throws Exception {
 		numElements++;
-		
+
 		if (duplicateChecker.get(value)) {
 			throw new Exception("Received a duplicate: " + value);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
index 37ed408..9aa1207 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ZooKeeperStringSerializer.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
-import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
 /**
  * Simple ZooKeeper serializer for Strings.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
index 16c226f..a3fb2b0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
+++ b/flink-connectors/flink-connector-kafka-base/src/test/resources/log4j-test.properties
@@ -26,4 +26,3 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
 
-