You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:42 UTC

[12/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-connector-kafka*

[FLINK-6711] Activate strict checkstyle for flink-connector-kafka*


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28e8043b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28e8043b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28e8043b

Branch: refs/heads/master
Commit: 28e8043ba09b47c99439fdb536a4226eccf70c07
Parents: c20b396
Author: zentol <ch...@apache.org>
Authored: Wed May 24 23:55:15 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:17 2017 +0200

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          |   4 +-
 .../connectors/kafka/FlinkKafkaConsumer010.java |  19 +-
 .../connectors/kafka/FlinkKafkaProducer010.java |  60 ++---
 .../kafka/Kafka010AvroTableSource.java          |   8 +-
 .../kafka/Kafka010JsonTableSource.java          |   4 +-
 .../connectors/kafka/Kafka010TableSource.java   |   4 +-
 .../kafka/internal/Kafka010Fetcher.java         |   9 +-
 .../internal/KafkaConsumerCallBridge010.java    |   8 +-
 .../src/main/resources/log4j.properties         |   1 -
 .../kafka/Kafka010AvroTableSourceTest.java      |   6 +-
 .../connectors/kafka/Kafka010FetcherTest.java   |   3 -
 .../connectors/kafka/Kafka010ITCase.java        |  30 ++-
 .../kafka/Kafka010JsonTableSourceTest.java      |   6 +-
 .../kafka/Kafka010ProducerITCase.java           |   5 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  41 ++-
 .../flink-connector-kafka-0.8/pom.xml           |   3 +-
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  63 +++--
 .../connectors/kafka/FlinkKafkaConsumer081.java |   1 +
 .../connectors/kafka/FlinkKafkaConsumer082.java |   1 +
 .../connectors/kafka/FlinkKafkaProducer.java    |  10 +-
 .../connectors/kafka/FlinkKafkaProducer08.java  |   3 +-
 .../kafka/Kafka08AvroTableSource.java           |   8 +-
 .../connectors/kafka/Kafka08JsonTableSink.java  |   9 +-
 .../kafka/Kafka08JsonTableSource.java           |   4 +-
 .../connectors/kafka/Kafka08TableSource.java    |   4 +-
 .../kafka/internals/ClosableBlockingQueue.java  |  97 ++++---
 .../kafka/internals/Kafka08Fetcher.java         |  70 +++--
 .../kafka/internals/KillerWatchDog.java         |   2 +-
 .../kafka/internals/PartitionInfoFetcher.java   |   3 +-
 .../internals/PeriodicOffsetCommitter.java      |  23 +-
 .../kafka/internals/SimpleConsumerThread.java   |  96 ++++---
 .../kafka/internals/ZookeeperOffsetHandler.java |  24 +-
 .../kafka/Kafka08AvroTableSourceTest.java       |   6 +-
 .../connectors/kafka/Kafka08ITCase.java         |  16 +-
 .../kafka/Kafka08JsonTableSinkTest.java         |   6 +-
 .../kafka/Kafka08JsonTableSourceTest.java       |   8 +-
 .../connectors/kafka/Kafka08ProducerITCase.java |   4 +-
 .../connectors/kafka/KafkaConsumer08Test.java   |  43 ++--
 .../connectors/kafka/KafkaLocalSystemTime.java  |  48 ----
 .../connectors/kafka/KafkaProducerTest.java     |  18 +-
 .../kafka/KafkaShortRetention08ITCase.java      |   7 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  35 +--
 .../internals/ClosableBlockingQueueTest.java    |  84 +++---
 .../flink-connector-kafka-0.9/pom.xml           |   4 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  30 +--
 .../connectors/kafka/FlinkKafkaProducer09.java  |   3 +-
 .../kafka/Kafka09AvroTableSource.java           |   8 +-
 .../connectors/kafka/Kafka09JsonTableSink.java  |   9 +-
 .../kafka/Kafka09JsonTableSource.java           |   4 +-
 .../connectors/kafka/Kafka09TableSource.java    |   4 +-
 .../connectors/kafka/internal/Handover.java     |  32 +--
 .../kafka/internal/Kafka09Fetcher.java          |  16 +-
 .../kafka/internal/KafkaConsumerCallBridge.java |   8 +-
 .../kafka/internal/KafkaConsumerThread.java     |  35 ++-
 .../src/main/resources/log4j.properties         |   1 -
 .../kafka/Kafka09AvroTableSourceTest.java       |   6 +-
 .../connectors/kafka/Kafka09FetcherTest.java    |  21 +-
 .../connectors/kafka/Kafka09ITCase.java         |   5 +-
 .../kafka/Kafka09JsonTableSinkTest.java         |   6 +-
 .../kafka/Kafka09JsonTableSourceTest.java       |   8 +-
 .../connectors/kafka/Kafka09ProducerITCase.java |   4 +-
 .../kafka/Kafka09SecuredRunITCase.java          |   8 +-
 .../connectors/kafka/KafkaProducerTest.java     |  20 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  41 +--
 .../connectors/kafka/internal/HandoverTest.java |  10 +-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../flink-connector-kafka-base/pom.xml          |   5 +-
 .../kafka/FlinkKafkaConsumerBase.java           | 101 ++++----
 .../kafka/FlinkKafkaProducerBase.java           |  44 ++--
 .../connectors/kafka/KafkaAvroTableSource.java  |  16 +-
 .../connectors/kafka/KafkaJsonTableSink.java    |   7 +-
 .../connectors/kafka/KafkaJsonTableSource.java  |   4 +-
 .../connectors/kafka/KafkaTableSink.java        |  11 +-
 .../connectors/kafka/KafkaTableSource.java      |   3 +-
 .../kafka/config/OffsetCommitMode.java          |   2 +-
 .../connectors/kafka/config/StartupMode.java    |   9 +-
 .../kafka/internals/AbstractFetcher.java        | 110 ++++----
 .../kafka/internals/ExceptionProxy.java         |  37 +--
 .../kafka/internals/KafkaTopicPartition.java    |  15 +-
 .../internals/KafkaTopicPartitionState.java     |  19 +-
 .../KafkaTopicPartitionStateSentinel.java       |   8 +-
 ...picPartitionStateWithPeriodicWatermarks.java |  23 +-
 ...cPartitionStateWithPunctuatedWatermarks.java |  25 +-
 .../partitioner/FlinkFixedPartitioner.java      |  17 +-
 .../FlinkKafkaDelegatePartitioner.java          |   1 +
 .../AvroRowDeserializationSchema.java           |  21 +-
 .../AvroRowSerializationSchema.java             |  13 +-
 .../JSONDeserializationSchema.java              |   6 +-
 .../JSONKeyValueDeserializationSchema.java      |  16 +-
 .../JsonRowDeserializationSchema.java           |   8 +-
 .../JsonRowSerializationSchema.java             |  11 +-
 .../KeyedDeserializationSchema.java             |   4 +-
 .../KeyedDeserializationSchemaWrapper.java      |   4 +-
 .../serialization/KeyedSerializationSchema.java |  10 +-
 .../KeyedSerializationSchemaWrapper.java        |   3 +-
 ...eInformationKeyValueSerializationSchema.java |  32 +--
 .../kafka/AvroRowDeSerializationSchemaTest.java |  10 +-
 .../kafka/FlinkFixedPartitionerTest.java        | 109 ++++++++
 ...inkKafkaConsumerBaseFrom11MigrationTest.java |   5 +-
 ...inkKafkaConsumerBaseFrom12MigrationTest.java |  31 +--
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  23 +-
 .../kafka/FlinkKafkaProducerBaseTest.java       |  37 +--
 .../kafka/JSONDeserializationSchemaTest.java    |   7 +-
 .../JSONKeyValueDeserializationSchemaTest.java  |   8 +-
 .../kafka/JsonRowDeserializationSchemaTest.java |  10 +-
 .../kafka/JsonRowSerializationSchemaTest.java   |   9 +-
 .../KafkaConsumerPartitionAssignmentTest.java   |   4 +-
 .../connectors/kafka/KafkaConsumerTestBase.java | 256 +++++++++----------
 .../connectors/kafka/KafkaProducerTestBase.java |  15 +-
 .../kafka/KafkaShortRetentionTestBase.java      |  30 +--
 .../kafka/KafkaTableSinkTestBase.java           |  15 +-
 .../kafka/KafkaTableSourceTestBase.java         |  22 +-
 .../connectors/kafka/KafkaTestBase.java         |  19 +-
 .../connectors/kafka/KafkaTestEnvironment.java  |  18 +-
 .../kafka/TestFlinkFixedPartitioner.java        | 104 --------
 .../kafka/internals/AbstractFetcherTest.java    |  24 +-
 .../internals/KafkaTopicPartitionTest.java      |  13 +-
 .../kafka/testutils/AvroTestUtils.java          |  18 +-
 .../kafka/testutils/DataGenerators.java         |  37 +--
 .../kafka/testutils/FailingIdentityMapper.java  |  23 +-
 .../testutils/FakeStandardProducerConfig.java   |   4 +
 .../testutils/JobManagerCommunicationUtils.java |  36 +--
 .../testutils/PartitionValidatingMapper.java    |   8 +-
 .../kafka/testutils/ThrottledMapper.java        |   6 +-
 .../kafka/testutils/Tuple2FlinkPartitioner.java |   4 +-
 .../testutils/ValidatingExactlyOnceSink.java    |  11 +-
 .../testutils/ZooKeeperStringSerializer.java    |   3 +-
 .../src/test/resources/log4j-test.properties    |   1 -
 128 files changed, 1361 insertions(+), 1333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 231b22e..143cb7f 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -130,7 +130,7 @@ under the License.
 			<version>${kafka.version}</version>
 			<scope>test</scope>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-tests_${scala.binary.version}</artifactId>
@@ -209,5 +209,5 @@ under the License.
 			</plugin>
 		</plugins>
 	</build>
-	
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 23fc84e..1bbd1dc 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -29,21 +29,21 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.util.PropertiesUtil;
 import org.apache.flink.util.SerializedValue;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 import java.util.Collections;
-import java.util.Map;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
-
 /**
  * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
  * Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, each of which will pull
- * data from one or more Kafka partitions. 
- * 
+ * data from one or more Kafka partitions.
+ *
  * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
- * during a failure, and that the computation processes elements "exactly once". 
+ * during a failure, and that the computation processes elements "exactly once".
  * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
  *
  * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
@@ -62,11 +62,10 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 
 	private static final long serialVersionUID = 2324564345203409112L;
 
-
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x
+	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x.
 	 *
 	 * @param topic
 	 *           The name of the topic that should be consumed.
@@ -82,7 +81,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 	/**
 	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x
 	 *
-	 * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+	 * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
 	 * pairs, offsets, and topic names from Kafka.
 	 *
 	 * @param topic
@@ -99,7 +98,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 	/**
 	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x
 	 *
-	 * This constructor allows passing multiple topics to the consumer.
+	 * <p>This constructor allows passing multiple topics to the consumer.
 	 *
 	 * @param topics
 	 *           The Kafka topics to read from.
@@ -115,7 +114,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 	/**
 	 * Creates a new Kafka streaming source consumer for Kafka 0.10.x
 	 *
-	 * This constructor allows passing multiple topics and a key/value deserialization schema.
+	 * <p>This constructor allows passing multiple topics and a key/value deserialization schema.
 	 *
 	 * @param topics
 	 *           The Kafka topics to read from.

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 711fe07..805bc4e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.Properties;
-
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -37,34 +35,35 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import java.util.Properties;
+
 import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic;
 import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
 
-
 /**
  * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x
  *
- * Implementation note: This producer is a hybrid between a regular regular sink function (a)
+ * <p>Implementation note: This producer is a hybrid between a regular regular sink function (a)
  * and a custom operator (b).
  *
- * For (a), the class implements the SinkFunction and RichFunction interfaces.
+ * <p>For (a), the class implements the SinkFunction and RichFunction interfaces.
  * For (b), it extends the StreamTask class.
  *
- * Details about approach (a):
- *
+ * <p>Details about approach (a):
  *  Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the
  *  DataStream.addSink() method.
  *  Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record
  *  the Kafka 0.10 producer has a second invocation option, approach (b).
  *
- * Details about approach (b):
+ * <p>Details about approach (b):
  *  Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the
  *  FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
  *  can access the internal record timestamp of the record and write it to Kafka.
  *
- * All methods and constructors in this class are marked with the approach they are needed for.
+ * <p>All methods and constructors in this class are marked with the approach they are needed for.
  */
 public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction {
 
@@ -79,7 +78,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
-	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
 	 *
 	 * @param inStream The stream to write to Kafka
 	 * @param topicId ID of the Kafka topic.
@@ -93,12 +92,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
 	}
 
-
 	/**
 	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
 	 * the topic.
 	 *
-	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
 	 *
 	 * @param inStream The stream to write to Kafka
 	 * @param topicId ID of the Kafka topic.
@@ -116,7 +114,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
-	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
 	 *
 	 *  @param inStream The stream to write to Kafka
 	 *  @param topicId The name of the target topic
@@ -212,11 +210,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
 		this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
 	}
-	
+
 	/**
-	 * Create Kafka producer
+	 * Create Kafka producer.
 	 *
-	 * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+	 * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
 	 */
 	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
 		// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
@@ -230,7 +228,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
 	 *
-	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
 	 *
 	 *  @param inStream The stream to write to Kafka
 	 *  @param topicId The name of the target topic
@@ -275,9 +273,9 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	}
 
 	/**
-	 * Create Kafka producer
+	 * Create Kafka producer.
 	 *
-	 * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+	 * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
 	 *
 	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
 	 *             producing to multiple topics. Use
@@ -306,13 +304,13 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		}
 
 		Long timestamp = null;
-		if(this.writeTimestampToKafka) {
+		if (this.writeTimestampToKafka) {
 			timestamp = elementTimestamp;
 		}
 
 		ProducerRecord<byte[], byte[]> record;
 		int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic);
-		if(null == partitions) {
+		if (null == partitions) {
 			partitions = getPartitionsByTopic(targetTopic, internalProducer.producer);
 			internalProducer.topicPartitionsMap.put(targetTopic, partitions);
 		}
@@ -329,10 +327,8 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		internalProducer.producer.send(record, internalProducer.callback);
 	}
 
-
 	// ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ----
 
-
 	// ---- Configuration setters
 
 	/**
@@ -341,7 +337,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * exceptions will be eventually thrown and cause the streaming program to
 	 * fail (and enter recovery).
 	 *
-	 * Method is only accessible for approach (a) (see above)
+	 * <p>Method is only accessible for approach (a) (see above)
 	 *
 	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
 	 */
@@ -355,7 +351,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * to be acknowledged by the Kafka producer on a checkpoint.
 	 * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
 	 *
-	 * Method is only accessible for approach (a) (see above)
+	 * <p>Method is only accessible for approach (a) (see above)
 	 *
 	 * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
 	 */
@@ -365,8 +361,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	}
 
 	/**
-	 * This method is used for approach (a) (see above)
-	 *
+	 * This method is used for approach (a) (see above).
 	 */
 	@Override
 	public void open(Configuration parameters) throws Exception {
@@ -375,7 +370,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	}
 
 	/**
-	 * This method is used for approach (a) (see above)
+	 * This method is used for approach (a) (see above).
 	 */
 	@Override
 	public IterationRuntimeContext getIterationRuntimeContext() {
@@ -384,7 +379,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	}
 
 	/**
-	 * This method is used for approach (a) (see above)
+	 * This method is used for approach (a) (see above).
 	 */
 	@Override
 	public void setRuntimeContext(RuntimeContext t) {
@@ -395,7 +390,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	/**
 	 * Invoke method for using the Sink as DataStream.addSink() sink.
 	 *
-	 * This method is used for approach (a) (see above)
+	 * <p>This method is used for approach (a) (see above)
 	 *
 	 * @param value The input record.
 	 */
@@ -404,14 +399,12 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		invokeInternal(value, Long.MAX_VALUE);
 	}
 
-
 	// ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ----
 
-
 	/**
 	 * Process method for using the sink with timestamp support.
 	 *
-	 * This method is used for approach (b) (see above)
+	 * <p>This method is used for approach (b) (see above)
 	 */
 	@Override
 	public void processElement(StreamRecord<T> element) throws Exception {
@@ -467,5 +460,4 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		}
 	}
 
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
index 1b2abcc..9921428 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
@@ -18,13 +18,15 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.Properties;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.util.Properties;
+
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.10.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index 78ef28e..f400f6b 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -19,9 +19,9 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
 
 import java.util.Properties;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index 03e9125..a6de13a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -19,9 +19,9 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
 
 import java.util.Properties;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 586d841..eb4dfee 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -36,10 +36,10 @@ import java.util.Properties;
 
 /**
  * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API.
- * 
+ *
  * <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It only additionally
  * takes the KafkaRecord-attached timestamp and attaches it to the Flink records.
- * 
+ *
  * @param <T> The type of elements produced by the fetcher.
  */
 public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
@@ -57,8 +57,7 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long pollTimeout,
-			boolean useMetrics) throws Exception
-	{
+			boolean useMetrics) throws Exception {
 		super(
 				sourceContext,
 				assignedPartitionsWithInitialOffsets,
@@ -88,7 +87,7 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 
 	/**
 	 * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10,
-	 * changing binary signatures
+	 * changing binary signatures.
 	 */
 	@Override
 	protected KafkaConsumerCallBridge010 createCallBridge() {

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
index 0fda9a6..b621140 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
@@ -26,11 +26,11 @@ import java.util.List;
 
 /**
  * The ConsumerCallBridge simply calls the {@link KafkaConsumer#assign(java.util.Collection)} method.
- * 
- * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
+ *
+ * <p>This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
  * changing {@code assign(List)} to {@code assign(Collection)}.
- * 
- * Because of that, we need two versions whose compiled code goes against different method signatures.
+ *
+ * <p>Because of that, we need two versions whose compiled code goes against different method signatures.
  */
 public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
index 6bdfb48..6eef174 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
@@ -26,4 +26,3 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
 
-

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
index ed93725..025fefc 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.Properties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.types.Row;
 
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka010AvroTableSource}.
+ */
 public class Kafka010AvroTableSourceTest extends KafkaTableSourceTestBase {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 2d0551d..aedd4ba 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -39,10 +39,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
-
 import org.junit.Test;
 import org.junit.runner.RunWith;
-
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -63,7 +61,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
 import static org.powermock.api.mockito.PowerMockito.doAnswer;

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index add623e..22193b7 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -38,13 +38,17 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
 import org.junit.Test;
 
 import javax.annotation.Nullable;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
-
+/**
+ * IT cases for Kafka 0.10 .
+ */
 public class Kafka010ITCase extends KafkaConsumerTestBase {
 
 	// ------------------------------------------------------------------------
@@ -83,7 +87,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 		runFailOnDeployTest();
 	}
 
-
 	// --- source to partition mappings and exactly once ---
 
 	@Test(timeout = 60000)
@@ -170,7 +173,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 	}
 
 	/**
-	 * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka
+	 * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka.
 	 */
 	@Test(timeout = 60000)
 	public void testTimestamps() throws Exception {
@@ -193,9 +196,9 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 			@Override
 			public void run(SourceContext<Long> ctx) throws Exception {
 				long i = 0;
-				while(running) {
-					ctx.collectWithTimestamp(i, i*2);
-					if(i++ == 1000L) {
+				while (running) {
+					ctx.collectWithTimestamp(i, i * 2);
+					if (i++ == 1000L) {
 						running = false;
 					}
 				}
@@ -213,7 +216,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 
 			@Override
 			public int partition(Long next, byte[] key, byte[] value, String targetTopic, int[] partitions) {
-				return (int)(next % 3);
+				return (int) (next % 3);
 			}
 		});
 		prod.setParallelism(3);
@@ -235,7 +238,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 			@Nullable
 			@Override
 			public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
-				if(lastElement % 10 == 0) {
+				if (lastElement % 10 == 0) {
 					return new Watermark(lastElement);
 				}
 				return null;
@@ -278,7 +281,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 		@Override
 		public void processElement(StreamRecord<Long> element) throws Exception {
 			elCount++;
-			if(element.getValue() * 2 != element.getTimestamp()) {
+			if (element.getValue() * 2 != element.getTimestamp()) {
 				throw new RuntimeException("Invalid timestamp: " + element);
 			}
 		}
@@ -287,13 +290,13 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 		public void processWatermark(Watermark mark) throws Exception {
 			wmCount++;
 
-			if(lastWM <= mark.getTimestamp()) {
+			if (lastWM <= mark.getTimestamp()) {
 				lastWM = mark.getTimestamp();
 			} else {
 				throw new RuntimeException("Received watermark higher than the last one");
 			}
 
-			if( mark.getTimestamp() % 10 != 0 && mark.getTimestamp() != Long.MAX_VALUE ) {
+			if (mark.getTimestamp() % 10 != 0 && mark.getTimestamp() != Long.MAX_VALUE) {
 				throw new RuntimeException("Invalid watermark: " + mark.getTimestamp());
 			}
 		}
@@ -301,11 +304,11 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 		@Override
 		public void close() throws Exception {
 			super.close();
-			if(elCount != 1000L) {
+			if (elCount != 1000L) {
 				throw new RuntimeException("Wrong final element count " + elCount);
 			}
 
-			if(wmCount <= 2) {
+			if (wmCount <= 2) {
 				throw new RuntimeException("Almost no watermarks have been sent " + wmCount);
 			}
 		}
@@ -322,6 +325,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 			this.ti = TypeInfoParser.parse("Long");
 			this.ser = ti.createSerializer(new ExecutionConfig());
 		}
+
 		@Override
 		public TypeInformation<Long> getProducedType() {
 			return ti;

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
index 55e8b9c..092f5ea 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.Properties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.types.Row;
 
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka010JsonTableSource}.
+ */
 public class Kafka010JsonTableSourceTest extends KafkaTableSourceTestBase {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
index 42b9682..64a5a3f 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-
 import org.junit.Test;
 
-
+/**
+ * IT cases for the {@link FlinkKafkaProducer010}.
+ */
 @SuppressWarnings("serial")
 public class Kafka010ProducerITCase extends KafkaProducerTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c88c858..cb30fbf 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -15,9 +15,17 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+
 import kafka.admin.AdminUtils;
 import kafka.common.KafkaException;
 import kafka.server.KafkaConfig;
@@ -27,14 +35,6 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -57,7 +57,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * An implementation of the KafkaServerProvider for Kafka 0.10
+ * An implementation of the KafkaServerProvider for Kafka 0.10 .
  */
 public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
@@ -87,7 +87,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	@Override
 	public Properties getSecureProperties() {
 		Properties prop = new Properties();
-		if(secureMode) {
+		if (secureMode) {
 			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
 			prop.put("security.protocol", "SASL_PLAINTEXT");
 			prop.put("sasl.kerberos.service.name", "kafka");
@@ -95,7 +95,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			//add special timeout for Travis
 			prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
 			prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
-			prop.setProperty("metadata.fetch.timeout.ms","120000");
+			prop.setProperty("metadata.fetch.timeout.ms", "120000");
 		}
 		return prop;
 	}
@@ -122,7 +122,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		return new StreamSink<>(prod);
 	}
 
-
 	@Override
 	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
@@ -176,7 +175,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	@Override
 	public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
 		//increase the timeout since in Travis ZK connection takes long time for secure connection.
-		if(secureMode) {
+		if (secureMode) {
 			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
 			numKafkaServers = 1;
 			zkTimeout = zkTimeout * 15;
@@ -203,7 +202,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		brokers = null;
 
 		try {
-			zookeeper = new TestingServer(-	1, tmpZkDir);
+			zookeeper = new TestingServer(-1, tmpZkDir);
 			zookeeperConnectionString = zookeeper.getConnectString();
 			LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
@@ -213,7 +212,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			for (int i = 0; i < numKafkaServers; i++) {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 
-				if(secureMode) {
+				if (secureMode) {
 					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
 				} else {
 					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
@@ -299,7 +298,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		final long deadline = System.nanoTime() + 30_000_000_000L;
 		do {
 			try {
-				if(secureMode) {
+				if (secureMode) {
 					//increase wait time since in Travis ZK timeout occurs frequently
 					int wait = zkTimeout / 100;
 					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
@@ -315,7 +314,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 			// create a new ZK utils connection
 			ZkUtils checkZKConn = getZkUtils();
-			if(AdminUtils.topicExists(checkZKConn, topic)) {
+			if (AdminUtils.topicExists(checkZKConn, topic)) {
 				checkZKConn.close();
 				return;
 			}
@@ -343,7 +342,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	/**
-	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed).
 	 */
 	protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
 		Properties kafkaProperties = new Properties();
@@ -359,7 +358,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		// for CI stability, increase zookeeper session timeout
 		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
 		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
-		if(additionalServerProperties != null) {
+		if (additionalServerProperties != null) {
 			kafkaProperties.putAll(additionalServerProperties);
 		}
 
@@ -370,7 +369,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			kafkaProperties.put("port", Integer.toString(kafkaPort));
 
 			//to support secure kafka cluster
-			if(secureMode) {
+			if (secureMode) {
 				LOG.info("Adding Kafka secure configurations");
 				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
 				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml
index 5e2ed2d..b6b0336 100644
--- a/flink-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -175,7 +175,6 @@ under the License.
 
 	</dependencies>
 
-
 	<build>
 		<plugins>
 			<plugin>
@@ -215,5 +214,5 @@ under the License.
 			</plugin>
 		</plugins>
 	</build>
-	
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 858a790..6c7b94d 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -17,13 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -39,6 +32,12 @@ import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.PropertiesUtil;
 import org.apache.flink.util.SerializedValue;
 
+import kafka.cluster.Broker;
+import kafka.common.ErrorMapping;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Node;
 
@@ -46,25 +45,25 @@ import java.net.InetAddress;
 import java.net.URL;
 import java.net.UnknownHostException;
 import java.nio.channels.ClosedChannelException;
-import java.util.Collections;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
-import static org.apache.flink.util.PropertiesUtil.getInt;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.PropertiesUtil.getInt;
 
 /**
  * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
  * Apache Kafka 0.8.x. The consumer can run in multiple parallel instances, each of which will pull
- * data from one or more Kafka partitions. 
- * 
+ * data from one or more Kafka partitions.
+ *
  * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
- * during a failure, and that the computation processes elements "exactly once". 
+ * during a failure, and that the computation processes elements "exactly once".
  * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
- * 
+ *
  * <p>Flink's Kafka Consumer is designed to be compatible with Kafka's High-Level Consumer API (0.8.x).
  * Most of Kafka's configuration variables can be used with this consumer as well:
  *         <ul>
@@ -74,11 +73,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *             <li>auto.offset.reset with the values "largest", "smallest"</li>
  *             <li>fetch.wait.max.ms</li>
  *         </ul>
- *     </li>
- * </ul>
- * 
+ *
  * <h1>Offset handling</h1>
- * 
+ *
  * <p>Offsets whose records have been read and are checkpointed will be committed back to ZooKeeper
  * by the offset handler. In addition, the offset handler finds the point where the source initially
  * starts reading from the stream, when the streaming job is started.</p>
@@ -93,7 +90,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>When using a Kafka topic to send data between Flink jobs, we recommend using the
  * {@see TypeInformationSerializationSchema} and {@see TypeInformationKeyValueSerializationSchema}.</p>
- * 
+ *
  * <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
  * is constructed. That means that the client that submits the program needs to be able to
  * reach the Kafka brokers or ZooKeeper.</p>
@@ -102,7 +99,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 
 	private static final long serialVersionUID = -6272159445203409112L;
 
-	/** Configuration key for the number of retries for getting the partition info */
+	/** Configuration key for the number of retries for getting the partition info. */
 	public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
 
 	/** Default number of retries for getting the partition info. One retry means going through the full list of brokers */
@@ -110,13 +107,13 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 
 	// ------------------------------------------------------------------------
 
-	/** The properties to parametrize the Kafka consumer and ZooKeeper client */ 
+	/** The properties to parametrize the Kafka consumer and ZooKeeper client. */
 	private final Properties kafkaProperties;
 
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x
+	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x.
 	 *
 	 * @param topic
 	 *           The name of the topic that should be consumed.
@@ -132,7 +129,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	/**
 	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x
 	 *
-	 * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+	 * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
 	 * pairs, offsets, and topic names from Kafka.
 	 *
 	 * @param topic
@@ -149,7 +146,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	/**
 	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x
 	 *
-	 * This constructor allows passing multiple topics to the consumer.
+	 * <p>This constructor allows passing multiple topics to the consumer.
 	 *
 	 * @param topics
 	 *           The Kafka topics to read from.
@@ -165,8 +162,8 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	/**
 	 * Creates a new Kafka streaming source consumer for Kafka 0.8.x
 	 *
-	 * This constructor allows passing multiple topics and a key/value deserialization schema.
-	 * 
+	 * <p>This constructor allows passing multiple topics and a key/value deserialization schema.
+	 *
 	 * @param topics
 	 *           The Kafka topics to read from.
 	 * @param deserializer
@@ -245,14 +242,14 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 
 	/**
 	 * Send request to Kafka to get partitions for topic.
-	 * 
+	 *
 	 * @param topics The name of the topics.
-	 * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic. 
+	 * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic.
 	 */
 	public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) {
 		String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
 		final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
-		
+
 		checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
 		String[] seedBrokers = seedBrokersConfString.split(",");
 		List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
@@ -328,7 +325,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	}
 
 	/**
-	 * Turn a broker instance into a node instance
+	 * Turn a broker instance into a node instance.
 	 * @param broker broker instance
 	 * @return Node representing the given broker
 	 */
@@ -337,7 +334,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	}
 
 	/**
-	 * Validate the ZK configuration, checking for required parameters
+	 * Validate the ZK configuration, checking for required parameters.
 	 * @param props Properties to check
 	 */
 	protected static void validateZooKeeperConfig(Properties props) {
@@ -348,7 +345,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 			throw new IllegalArgumentException("Required property '" + ConsumerConfig.GROUP_ID_CONFIG
 					+ "' has not been set in the properties");
 		}
-		
+
 		try {
 			//noinspection ResultOfMethodCallIgnored
 			Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
@@ -356,7 +353,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 		catch (NumberFormatException e) {
 			throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
 		}
-		
+
 		try {
 			//noinspection ResultOfMethodCallIgnored
 			Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
@@ -369,7 +366,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	/**
 	 * Validate that at least one seed broker is valid in case of a
 	 * ClosedChannelException.
-	 * 
+	 *
 	 * @param seedBrokers
 	 *            array containing the seed brokers e.g. ["host1:port1",
 	 *            "host2:port2"]

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
index 4e4050f..4102bf8 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
index aeefcc8..7ba5103 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 98dac3e..434286e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -22,8 +22,8 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import java.util.Properties;
 
+import java.util.Properties;
 
 /**
  * THIS CLASS IS DEPRECATED. Use FlinkKafkaProducer08 instead.
@@ -38,7 +38,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
-		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>)null);
+		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>) null);
 	}
 
 	/**
@@ -46,7 +46,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, (FlinkKafkaPartitioner<IN>)null);
+		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, (FlinkKafkaPartitioner<IN>) null);
 	}
 
 	/**
@@ -63,7 +63,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
-		super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>)null);
+		super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>) null);
 	}
 
 	/**
@@ -71,7 +71,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		super(topicId, serializationSchema, producerConfig, (FlinkKafkaPartitioner<IN>)null);
+		super(topicId, serializationSchema, producerConfig, (FlinkKafkaPartitioner<IN>) null);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 08dcb2f..a14768b 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -27,11 +27,10 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 import java.util.Properties;
 
-
 /**
  * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8.
  *
- * Please note that this producer does not have any reliability guarantees.
+ * <p>Please note that this producer does not have any reliability guarantees.
  *
  * @param <IN> Type of the messages to write into Kafka.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
index 1a68c05..a1bea78 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
@@ -18,13 +18,15 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.Properties;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.util.Properties;
+
 /**
  * Kafka {@link StreamTableSource} for Kafka 0.8.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 80bd180..79406d8 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -15,13 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
 
 import java.util.Properties;
 
@@ -29,9 +30,9 @@ import java.util.Properties;
  * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
  */
 public class Kafka08JsonTableSink extends KafkaJsonTableSink {
-	
+
 	/**
-	 * Creates {@link KafkaTableSink} for Kafka 0.8
+	 * Creates {@link KafkaTableSink} for Kafka 0.8.
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
@@ -42,7 +43,7 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 	}
 
 	/**
-	 * Creates {@link KafkaTableSink} for Kafka 0.8
+	 * Creates {@link KafkaTableSink} for Kafka 0.8.
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
index 1555a3b..05a2c71 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -19,9 +19,9 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
 
 import java.util.Properties;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index e1e481c..9536306 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -19,9 +19,9 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
 
 import java.util.Properties;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
index e31dcac..da61dd0 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
@@ -38,27 +38,27 @@ import static java.util.Objects.requireNonNull;
  *         are available and mark themselves as shut down.</li>
  *     <li>The queue allows to poll batches of elements in one polling call.</li>
  * </ol>
- * 
- * The queue has no capacity restriction and is safe for multiple producers and consumers.
- * 
+ *
+ * <p>The queue has no capacity restriction and is safe for multiple producers and consumers.
+ *
  * <p>Note: Null elements are prohibited.
- * 
+ *
  * @param <E> The type of elements in the queue.
  */
 public class ClosableBlockingQueue<E> {
 
-	/** The lock used to make queue accesses and open checks atomic */
+	/** The lock used to make queue accesses and open checks atomic. */
 	private final ReentrantLock lock;
-	
-	/** The condition on which blocking get-calls wait if the queue is empty */
+
+	/** The condition on which blocking get-calls wait if the queue is empty. */
 	private final Condition nonEmpty;
-	
-	/** The deque of elements */
+
+	/** The deque of elements. */
 	private final ArrayDeque<E> elements;
-	
-	/** Flag marking the status of the queue */
+
+	/** Flag marking the status of the queue. */
 	private volatile boolean open;
-	
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -72,22 +72,21 @@ public class ClosableBlockingQueue<E> {
 	 * Creates a new empty queue, reserving space for at least the specified number
 	 * of elements. The queu can still grow, of more elements are added than the
 	 * reserved space.
-	 * 
+	 *
 	 * @param initialSize The number of elements to reserve space for.
 	 */
 	public ClosableBlockingQueue(int initialSize) {
 		this.lock = new ReentrantLock(true);
 		this.nonEmpty = this.lock.newCondition();
-		
+
 		this.elements = new ArrayDeque<>(initialSize);
 		this.open = true;
-		
-		
+
 	}
 
 	/**
 	 * Creates a new queue that contains the given elements.
-	 * 
+	 *
 	 * @param initialElements The elements to initially add to the queue.
 	 */
 	public ClosableBlockingQueue(Collection<? extends E> initialElements) {
@@ -127,7 +126,7 @@ public class ClosableBlockingQueue<E> {
 	public boolean isOpen() {
 		return open;
 	}
-	
+
 	/**
 	 * Tries to close the queue. Closing the queue only succeeds when no elements are
 	 * in the queue when this method is called. Checking whether the queue is empty, and
@@ -155,25 +154,25 @@ public class ClosableBlockingQueue<E> {
 			lock.unlock();
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Adding / Removing elements
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Tries to add an element to the queue, if the queue is still open. Checking whether the queue
 	 * is open and adding the element is one atomic operation.
-	 * 
+	 *
 	 * <p>Unlike the {@link #add(Object)} method, this method never throws an exception,
 	 * but only indicates via the return code if the element was added or the
 	 * queue was closed.
-	 * 
+	 *
 	 * @param element The element to add.
 	 * @return True, if the element was added, false if the queue was closes.
 	 */
 	public boolean addIfOpen(E element) {
 		requireNonNull(element);
-		
+
 		lock.lock();
 		try {
 			if (open) {
@@ -191,7 +190,7 @@ public class ClosableBlockingQueue<E> {
 	/**
 	 * Adds the element to the queue, or fails with an exception, if the queue is closed.
 	 * Checking whether the queue is open and adding the element is one atomic operation.
-	 * 
+	 *
 	 * @param element The element to add.
 	 * @throws IllegalStateException Thrown, if the queue is closed.
 	 */
@@ -215,13 +214,13 @@ public class ClosableBlockingQueue<E> {
 
 	/**
 	 * Returns the queue's next element without removing it, if the queue is non-empty.
-	 * Otherwise, returns null. 
+	 * Otherwise, returns null.
 	 *
 	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
 	 * Checking whether the queue is open and getting the next element is one atomic operation.
-	 * 
+	 *
 	 * <p>This method never blocks.
-	 * 
+	 *
 	 * @return The queue's next element, or null, if the queue is empty.
 	 * @throws IllegalStateException Thrown, if the queue is closed.
 	 */
@@ -244,7 +243,7 @@ public class ClosableBlockingQueue<E> {
 
 	/**
 	 * Returns the queue's next element and removes it, the queue is non-empty.
-	 * Otherwise, this method returns null. 
+	 * Otherwise, this method returns null.
 	 *
 	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
 	 * Checking whether the queue is open and removing the next element is one atomic operation.
@@ -273,7 +272,7 @@ public class ClosableBlockingQueue<E> {
 
 	/**
 	 * Returns all of the queue's current elements in a list, if the queue is non-empty.
-	 * Otherwise, this method returns null. 
+	 * Otherwise, this method returns null.
 	 *
 	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
 	 * Checking whether the queue is open and removing the elements is one atomic operation.
@@ -305,12 +304,12 @@ public class ClosableBlockingQueue<E> {
 	/**
 	 * Returns the next element in the queue. If the queue is empty, this method
 	 * waits until at least one element is added.
-	 * 
+	 *
 	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
 	 * Checking whether the queue is open and removing the next element is one atomic operation.
-	 * 
+	 *
 	 * @return The next element in the queue, never null.
-	 * 
+	 *
 	 * @throws IllegalStateException Thrown, if the queue is closed.
 	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
 	 *                              element to be added.
@@ -321,7 +320,7 @@ public class ClosableBlockingQueue<E> {
 			while (open && elements.isEmpty()) {
 				nonEmpty.await();
 			}
-			
+
 			if (open) {
 				return elements.removeFirst();
 			} else {
@@ -336,13 +335,13 @@ public class ClosableBlockingQueue<E> {
 	 * Returns the next element in the queue. If the queue is empty, this method
 	 * waits at most a certain time until an element becomes available. If no element
 	 * is available after that time, the method returns null.
-	 * 
+	 *
 	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
 	 * Checking whether the queue is open and removing the next element is one atomic operation.
-	 * 
+	 *
 	 * @param timeoutMillis The number of milliseconds to block, at most.
 	 * @return The next element in the queue, or null, if the timeout expires  before an element is available.
-	 * 
+	 *
 	 * @throws IllegalStateException Thrown, if the queue is closed.
 	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
 	 *                              element to be added.
@@ -354,16 +353,16 @@ public class ClosableBlockingQueue<E> {
 		} else if (timeoutMillis < 0L) {
 			throw new IllegalArgumentException("invalid timeout");
 		}
-		
+
 		final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
-		
+
 		lock.lock();
 		try {
-			while (open && elements.isEmpty() && timeoutMillis > 0) { 
+			while (open && elements.isEmpty() && timeoutMillis > 0) {
 				nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
 				timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
 			}
-			
+
 			if (!open) {
 				throw new IllegalStateException("queue is closed");
 			}
@@ -383,12 +382,12 @@ public class ClosableBlockingQueue<E> {
 	 * at least one element is added.
 	 *
 	 * <p>This method always returns a list with at least one element.
-	 * 
+	 *
 	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
 	 * Checking whether the queue is open and removing the next element is one atomic operation.
-	 * 
+	 *
 	 * @return A list with all elements in the queue, always at least one element.
-	 * 
+	 *
 	 * @throws IllegalStateException Thrown, if the queue is closed.
 	 * @throws InterruptedException Throw, if the thread is interrupted while waiting for an
 	 *                              element to be added.
@@ -415,13 +414,13 @@ public class ClosableBlockingQueue<E> {
 	 * Gets all the elements found in the list, or blocks until at least one element
 	 * was added. This method is similar as {@link #getBatchBlocking()}, but takes
 	 * a number of milliseconds that the method will maximally wait before returning.
-	 * 
+	 *
 	 * <p>This method never returns null, but an empty list, if the queue is empty when
 	 * the method is called and the request times out before an element was added.
-	 * 
+	 *
 	 * <p>The method throws an {@code IllegalStateException} if the queue is closed.
 	 * Checking whether the queue is open and removing the next element is one atomic operation.
-	 * 
+	 *
 	 * @param timeoutMillis The number of milliseconds to wait, at most.
 	 * @return A list with all elements in the queue, possible an empty list.
 	 *
@@ -461,11 +460,11 @@ public class ClosableBlockingQueue<E> {
 			lock.unlock();
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Standard Utilities
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public int hashCode() {
 		int hashCode = 17;
@@ -482,7 +481,7 @@ public class ClosableBlockingQueue<E> {
 		} else if (obj != null && obj.getClass() == ClosableBlockingQueue.class) {
 			@SuppressWarnings("unchecked")
 			ClosableBlockingQueue<E> that = (ClosableBlockingQueue<E>) obj;
-			
+
 			if (this.elements.size() == that.elements.size()) {
 				Iterator<E> thisElements = this.elements.iterator();
 				for (E thatNext : that.elements) {