You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:39 UTC

[09/21] flink git commit: [FLINK-6711] Activate strict checkstyle for flink-connector-kafka*

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
index c736493..06cdf2c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
@@ -19,24 +19,25 @@
 package org.apache.flink.streaming.connectors.kafka.internals;
 
 import javax.annotation.Nullable;
+
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A proxy that communicates exceptions between threads. Typically used if an exception
  * from a spawned thread needs to be recognized by the "parent" (spawner) thread.
- * 
+ *
  * <p>The spawned thread would set the exception via {@link #reportError(Throwable)}.
  * The parent would check (at certain points) for exceptions via {@link #checkAndThrowException()}.
  * Optionally, the parent can pass itself in the constructor to be interrupted as soon as
  * an exception occurs.
- * 
+ *
  * <pre>
  * {@code
- * 
+ *
  * final ExceptionProxy errorProxy = new ExceptionProxy(Thread.currentThread());
- * 
+ *
  * Thread subThread = new Thread() {
- * 
+ *
  *     public void run() {
  *         try {
  *             doSomething();
@@ -48,13 +49,13 @@ import java.util.concurrent.atomic.AtomicReference;
  *     }
  * };
  * subThread.start();
- * 
+ *
  * doSomethingElse();
  * errorProxy.checkAndThrowException();
- * 
+ *
  * doSomethingMore();
  * errorProxy.checkAndThrowException();
- * 
+ *
  * try {
  *     subThread.join();
  * } catch (InterruptedException e) {
@@ -66,33 +67,33 @@ import java.util.concurrent.atomic.AtomicReference;
  * </pre>
  */
 public class ExceptionProxy {
-	
-	/** The thread that should be interrupted when an exception occurs */
+
+	/** The thread that should be interrupted when an exception occurs. */
 	private final Thread toInterrupt;
-	
-	/** The exception to throw */ 
+
+	/** The exception to throw. */
 	private final AtomicReference<Throwable> exception;
 
 	/**
 	 * Creates an exception proxy that interrupts the given thread upon
 	 * report of an exception. The thread to interrupt may be null.
-	 * 
+	 *
 	 * @param toInterrupt The thread to interrupt upon an exception. May be null.
 	 */
 	public ExceptionProxy(@Nullable Thread toInterrupt) {
 		this.toInterrupt = toInterrupt;
 		this.exception = new AtomicReference<>();
 	}
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Sets the exception and interrupts the target thread,
 	 * if no other exception has occurred so far.
-	 * 
+	 *
 	 * <p>The exception is only set (and the interruption is only triggered),
 	 * if no other exception was set before.
-	 * 
+	 *
 	 * @param t The exception that occurred
 	 */
 	public void reportError(Throwable t) {
@@ -105,7 +106,7 @@ public class ExceptionProxy {
 	/**
 	 * Checks whether an exception has been set via {@link #reportError(Throwable)}.
 	 * If yes, that exception if re-thrown by this method.
-	 * 
+	 *
 	 * @throws Exception This method re-throws the exception, if set.
 	 */
 	public void checkAndThrowException() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
index c68fe28..f3645e3 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -27,16 +27,16 @@ import static java.util.Objects.requireNonNull;
 /**
  * Flink's description of a partition in a Kafka topic.
  * Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, ...)
- * 
+ *
  * <p>Note: This class must not change in its structure, because it would change the
  * serialization format and make previous savepoints unreadable.
  */
 public final class KafkaTopicPartition implements Serializable {
 
 	/** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
-	 * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */
+	 * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS. */
 	private static final long serialVersionUID = 722083576322742325L;
-	
+
 	// ------------------------------------------------------------------------
 
 	private final String topic;
@@ -50,7 +50,7 @@ public final class KafkaTopicPartition implements Serializable {
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	public String getTopic() {
 		return topic;
 	}
@@ -60,7 +60,7 @@ public final class KafkaTopicPartition implements Serializable {
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "KafkaTopicPartition{" +
@@ -87,7 +87,7 @@ public final class KafkaTopicPartition implements Serializable {
 	public int hashCode() {
 		return cachedHash;
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -109,10 +109,9 @@ public final class KafkaTopicPartition implements Serializable {
 		return sb.toString();
 	}
 
-
 	public static List<KafkaTopicPartition> dropLeaderData(List<KafkaTopicPartitionLeader> partitionInfos) {
 		List<KafkaTopicPartition> ret = new ArrayList<>(partitionInfos.size());
-		for(KafkaTopicPartitionLeader ktpl: partitionInfos) {
+		for (KafkaTopicPartitionLeader ktpl: partitionInfos) {
 			ret.add(ktpl.getTopicPartition());
 		}
 		return ret;

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
index adfbf79..78ab612 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -20,31 +20,31 @@ package org.apache.flink.streaming.connectors.kafka.internals;
 /**
  * The state that the Flink Kafka Consumer holds for each Kafka partition.
  * Includes the Kafka descriptor for partitions.
- * 
+ *
  * <p>This class describes the most basic state (only the offset), subclasses
  * define more elaborate state, containing current watermarks and timestamp
  * extractors.
- * 
+ *
  * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
  */
 public class KafkaTopicPartitionState<KPH> {
-	
+
 	// ------------------------------------------------------------------------
 
-	/** The Flink description of a Kafka partition */
+	/** The Flink description of a Kafka partition. */
 	private final KafkaTopicPartition partition;
 
-	/** The Kafka description of a Kafka partition (varies across different Kafka versions) */
+	/** The Kafka description of a Kafka partition (varies across different Kafka versions). */
 	private final KPH kafkaPartitionHandle;
-	
-	/** The offset within the Kafka partition that we already processed */
+
+	/** The offset within the Kafka partition that we already processed. */
 	private volatile long offset;
 
-	/** The offset of the Kafka partition that has been committed */
+	/** The offset of the Kafka partition that has been committed. */
 	private volatile long committedOffset;
 
 	// ------------------------------------------------------------------------
-	
+
 	public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) {
 		this.partition = partition;
 		this.kafkaPartitionHandle = kafkaPartitionHandle;
@@ -103,7 +103,6 @@ public class KafkaTopicPartitionState<KPH> {
 		return committedOffset;
 	}
 
-	
 	// ------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
index 153a326..c218618 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.internals;
 /**
  * Magic values used to represent special offset states before partitions are actually read.
  *
- * The values are all negative. Negative offsets are not used by Kafka (invalid), so we
+ * <p>The values are all negative. Negative offsets are not used by Kafka (invalid), so we
  * pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else.
  */
 public class KafkaTopicPartitionStateSentinel {
@@ -31,7 +31,7 @@ public class KafkaTopicPartitionStateSentinel {
 	/**
 	 * Magic number that defines the partition should start from the earliest offset.
 	 *
-	 * This is used as a placeholder so that the actual earliest offset can be evaluated lazily
+	 * <p>This is used as a placeholder so that the actual earliest offset can be evaluated lazily
 	 * when the partition will actually start to be read by the consumer.
 	 */
 	public static final long EARLIEST_OFFSET = -915623761775L;
@@ -39,7 +39,7 @@ public class KafkaTopicPartitionStateSentinel {
 	/**
 	 * Magic number that defines the partition should start from the latest offset.
 	 *
-	 * This is used as a placeholder so that the actual latest offset can be evaluated lazily
+	 * <p>This is used as a placeholder so that the actual latest offset can be evaluated lazily
 	 * when the partition will actually start to be read by the consumer.
 	 */
 	public static final long LATEST_OFFSET = -915623761774L;
@@ -47,7 +47,7 @@ public class KafkaTopicPartitionStateSentinel {
 	/**
 	 * Magic number that defines the partition should start from its committed group offset in Kafka.
 	 *
-	 * This is used as a placeholder so that the actual committed group offset can be evaluated lazily
+	 * <p>This is used as a placeholder so that the actual committed group offset can be evaluated lazily
 	 * when the partition will actually start to be read by the consumer.
 	 */
 	public static final long GROUP_OFFSET = -915623761773L;

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
index efdc73f..5116e9f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
@@ -23,36 +23,35 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 /**
  * A special version of the per-kafka-partition-state that additionally holds
  * a periodic watermark generator (and timestamp extractor) per partition.
- * 
+ *
  * @param <T> The type of records handled by the watermark generator
  * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
  */
 public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
-	
-	/** The timestamp assigner and watermark generator for the partition */
+
+	/** The timestamp assigner and watermark generator for the partition. */
 	private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks;
-	
-	/** The last watermark timestamp generated by this partition */
+
+	/** The last watermark timestamp generated by this partition. */
 	private long partitionWatermark;
 
 	// ------------------------------------------------------------------------
-	
+
 	public KafkaTopicPartitionStateWithPeriodicWatermarks(
 			KafkaTopicPartition partition, KPH kafkaPartitionHandle,
-			AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks)
-	{
+			AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks) {
 		super(partition, kafkaPartitionHandle);
-		
+
 		this.timestampsAndWatermarks = timestampsAndWatermarks;
 		this.partitionWatermark = Long.MIN_VALUE;
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
 		return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
 	}
-	
+
 	public long getCurrentWatermarkTimestamp() {
 		Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
 		if (wm != null) {
@@ -62,7 +61,7 @@ public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extend
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition()

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
index edf40ce..f4a80a4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
@@ -25,35 +25,34 @@ import javax.annotation.Nullable;
 /**
  * A special version of the per-kafka-partition-state that additionally holds
  * a periodic watermark generator (and timestamp extractor) per partition.
- * 
+ *
  * <p>This class is not thread safe, but it gives volatile access to the current
  * partition watermark ({@link #getCurrentPartitionWatermark()}).
- * 
+ *
  * @param <T> The type of records handled by the watermark generator
  * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions
  */
 public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
-	
-	/** The timestamp assigner and watermark generator for the partition */
+
+	/** The timestamp assigner and watermark generator for the partition. */
 	private final AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks;
-	
-	/** The last watermark timestamp generated by this partition */
+
+	/** The last watermark timestamp generated by this partition. */
 	private volatile long partitionWatermark;
 
 	// ------------------------------------------------------------------------
-	
+
 	public KafkaTopicPartitionStateWithPunctuatedWatermarks(
 			KafkaTopicPartition partition, KPH kafkaPartitionHandle,
-			AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks)
-	{
+			AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks) {
 		super(partition, kafkaPartitionHandle);
-		
+
 		this.timestampsAndWatermarks = timestampsAndWatermarks;
 		this.partitionWatermark = Long.MIN_VALUE;
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
 		return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
 	}
@@ -69,13 +68,13 @@ public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> exte
 			return null;
 		}
 	}
-	
+
 	public long getCurrentPartitionWatermark() {
 		return partitionWatermark;
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "KafkaTopicPartitionStateWithPunctuatedWatermarks: partition=" + getKafkaTopicPartition()

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
index e47c667..6ed3717 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
 import org.apache.flink.util.Preconditions;
@@ -22,9 +23,9 @@ import org.apache.flink.util.Preconditions;
 /**
  * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
  *
- * Note, one Kafka partition can contain multiple Flink partitions.
+ * <p>Note, one Kafka partition can contain multiple Flink partitions.
  *
- * Cases:
+ * <p>Cases:
  * 	# More Flink partitions than kafka partitions
  * <pre>
  * 		Flink Sinks:		Kafka Partitions
@@ -35,7 +36,7 @@ import org.apache.flink.util.Preconditions;
  * </pre>
  * Some (or all) kafka partitions contain the output of more than one flink partition
  *
- *# Fewer Flink partitions than Kafka
+ * <p>Fewer Flink partitions than Kafka
  * <pre>
  * 		Flink Sinks:		Kafka Partitions
  * 			1	----------------&gt;	1
@@ -45,9 +46,9 @@ import org.apache.flink.util.Preconditions;
  * 										5
  * </pre>
  *
- *  Not all Kafka partitions contain data
- *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner (note that this will
- *  cause a lot of network connections between all the Flink instances and all the Kafka brokers).
+ * <p>Not all Kafka partitions contain data
+ * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner (note that this will
+ * cause a lot of network connections between all the Flink instances and all the Kafka brokers).
  */
 public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
 
@@ -60,13 +61,13 @@ public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
 
 		this.parallelInstanceId = parallelInstanceId;
 	}
-	
+
 	@Override
 	public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
 		Preconditions.checkArgument(
 			partitions != null && partitions.length > 0,
 			"Partitions of the target topic is empty.");
-		
+
 		return partitions[parallelInstanceId % partitions.length];
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
index b7b4143..168e76b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
index 37241f5..0713738 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
@@ -14,11 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util.serialization;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
@@ -28,13 +29,15 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
 
 /**
  * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}.
  *
- * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
+ * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
  *
  * {@link Utf8} is converted to regular Java Strings.
  */
@@ -56,7 +59,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 	private final MutableByteArrayInputStream inputStream;
 
 	/**
-	 * Avro decoder that decodes binary data
+	 * Avro decoder that decodes binary data.
 	 */
 	private final Decoder decoder;
 
@@ -133,9 +136,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 	 * InputStream instance, copying message to process, and creation of Decoder on every new message.
 	 */
 	private static final class MutableByteArrayInputStream extends ByteArrayInputStream {
-		/**
-		 * Create MutableByteArrayInputStream
-		 */
+
 		public MutableByteArrayInputStream() {
 			super(new byte[0]);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
index 8388ab5..450c78f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
@@ -15,11 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util.serialization;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -30,8 +31,10 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
 
 /**
  * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes.

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
index d170058..095e964 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util.serialization;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -21,11 +22,10 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 
 import java.io.IOException;
 
-
 /**
  * DeserializationSchema that deserializes a JSON String into an ObjectNode.
- * <p>
- * Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
+ *
+ * <p>Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
  */
 public class JSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
 	private ObjectMapper mapper;

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
index 261a111..f75df0c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -14,12 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util.serialization;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 import java.io.IOException;
 
@@ -27,12 +29,12 @@ import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
 
 /**
  * DeserializationSchema that deserializes a JSON String into an ObjectNode.
- * <p>
- * Key fields can be accessed by calling objectNode.get("key").get(&lt;name>).as(&lt;type>)
- * <p>
- * Value fields can be accessed by calling objectNode.get("value").get(&lt;name>).as(&lt;type>)
- * <p>
- * Metadata fields can be accessed by calling objectNode.get("metadata").get(&lt;name>).as(&lt;type>) and include
+ *
+ * <p>Key fields can be accessed by calling objectNode.get("key").get(&lt;name>).as(&lt;type>)
+ *
+ * <p>Value fields can be accessed by calling objectNode.get("value").get(&lt;name>).as(&lt;type>)
+ *
+ * <p>Metadata fields can be accessed by calling objectNode.get("metadata").get(&lt;name>).as(&lt;type>) and include
  * the "offset" (long), "topic" (String) and "partition" (int).
  */
 public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
index be201fa..f335c30 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
@@ -18,14 +18,16 @@
 
 package org.apache.flink.streaming.util.serialization;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
 /**
  * Deserialization schema from JSON to {@link Row}.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
index 1998aa6..a3fa379 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -14,14 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util.serialization;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
  * Serialization schema that serializes an object into a JSON bytes.
@@ -33,9 +34,9 @@ import org.apache.flink.util.Preconditions;
  * {@link JsonRowDeserializationSchema}.
  */
 public class JsonRowSerializationSchema implements SerializationSchema<Row> {
-	/** Fields names in the input Row object */
+	/** Fields names in the input Row object. */
 	private final String[] fieldNames;
-	/** Object mapper that is used to create output JSON objects */
+	/** Object mapper that is used to create output JSON objects. */
 	private static ObjectMapper mapper = new ObjectMapper();
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
index b5a33bc..234a96d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
  * The deserialization schema describes how to turn the byte key / value messages delivered by certain
  * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
  * processed by Flink.
- * 
+ *
  * @param <T> The type created by the keyed deserialization schema.
  */
 public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
@@ -46,7 +46,7 @@ public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQ
 	/**
 	 * Method to decide whether the element signals the end of the stream. If
 	 * true is returned the element won't be emitted.
-	 * 
+	 *
 	 * @param nextElement The element to test for the end-of-stream signal.
 	 *
 	 * @return True, if the element signals end of stream, false otherwise.

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
index 4b9dba2..e128aba 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util.serialization;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -22,7 +23,7 @@ import java.io.IOException;
 
 /**
  * A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema
- * interface
+ * interface.
  * @param <T> The type created by the deserialization schema.
  */
 public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
@@ -34,6 +35,7 @@ public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializatio
 	public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
 		this.deserializationSchema = deserializationSchema;
 	}
+
 	@Override
 	public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
 		return deserializationSchema.deserialize(message);

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
index 701281e..12bcab9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util.serialization;
 
 import java.io.Serializable;
@@ -22,7 +23,7 @@ import java.io.Serializable;
  * The serialization schema describes how to turn a data object into a different serialized
  * representation. Most data sinks (for example Apache Kafka) require the data to be handed
  * to them in a specific format (for example as byte strings).
- * 
+ *
  * @param <T> The type to be serialized.
  */
 public interface KeyedSerializationSchema<T> extends Serializable {
@@ -36,17 +37,16 @@ public interface KeyedSerializationSchema<T> extends Serializable {
 	 */
 	byte[] serializeKey(T element);
 
-
 	/**
-	 * Serializes the value of the incoming element to a byte array
-	 * 
+	 * Serializes the value of the incoming element to a byte array.
+	 *
 	 * @param element The incoming element to be serialized
 	 * @return the value of the element as a byte array
 	 */
 	byte[] serializeValue(T element);
 
 	/**
-	 * Optional method to determine the target topic for the element
+	 * Optional method to determine the target topic for the element.
 	 *
 	 * @param element Incoming element to determine the target topic from
 	 * @return null or the target topic

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
index 1b3e486..0a181d1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
@@ -14,11 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util.serialization;
 
 /**
  * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema
- * interface
+ * interface.
  * @param <T> The type to serialize
  */
 public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
index 51bc8d1..3e0cdb5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -32,30 +32,29 @@ import java.io.IOException;
 /**
  * A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to
  * transform typed from and to byte arrays.
- * 
+ *
  * @param <K> The key type to be serialized.
  * @param <V> The value type to be serialized.
  */
-public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> {
+public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K, V>> {
 
 	private static final long serialVersionUID = -5359448468131559102L;
 
-	/** The serializer for the key */
+	/** The serializer for the key. */
 	private final TypeSerializer<K> keySerializer;
 
-	/** The serializer for the value */
+	/** The serializer for the value. */
 	private final TypeSerializer<V> valueSerializer;
 
-	/** reusable input deserialization buffer */
+	/** reusable input deserialization buffer. */
 	private final DataInputDeserializer inputDeserializer;
-	
-	/** reusable output serialization buffer for the key */
+
+	/** reusable output serialization buffer for the key. */
 	private transient DataOutputSerializer keyOutputSerializer;
 
-	/** reusable output serialization buffer for the value */
+	/** reusable output serialization buffer for the value. */
 	private transient DataOutputSerializer valueOutputSerializer;
-	
-	
+
 	/** The type information, to be returned by {@link #getProducedType()}. It is
 	 * transient, because it is not serializable. Note that this means that the type information
 	 * is not available at runtime, but only prior to the first serialization / deserialization */
@@ -80,10 +79,10 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDe
 	/**
 	 * Creates a new de-/serialization schema for the given types. This constructor accepts the types
 	 * as classes and internally constructs the type information from the classes.
-	 * 
+	 *
 	 * <p>If the types are parametrized and cannot be fully defined via classes, use the constructor
 	 * that accepts {@link TypeInformation} instead.
-	 * 
+	 *
 	 * @param keyClass The class of the key de-/serialized by this schema.
 	 * @param valueClass The class of the value de-/serialized by this schema.
 	 * @param config The execution config, which is used to parametrize the type serializers.
@@ -94,12 +93,11 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDe
 
 	// ------------------------------------------------------------------------
 
-
 	@Override
 	public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
 		K key = null;
 		V value = null;
-		
+
 		if (messageKey != null) {
 			inputDeserializer.setBuffer(messageKey, 0, messageKey.length);
 			key = keySerializer.deserialize(inputDeserializer);
@@ -117,11 +115,10 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDe
 	 * @return Returns false.
 	 */
 	@Override
-	public boolean isEndOfStream(Tuple2<K,V> nextElement) {
+	public boolean isEndOfStream(Tuple2<K, V> nextElement) {
 		return false;
 	}
 
-
 	@Override
 	public byte[] serializeKey(Tuple2<K, V> element) {
 		if (element.f0 == null) {
@@ -182,9 +179,8 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDe
 		return null; // we are never overriding the topic
 	}
 
-
 	@Override
-	public TypeInformation<Tuple2<K,V>> getProducedType() {
+	public TypeInformation<Tuple2<K, V>> getProducedType() {
 		if (typeInfo != null) {
 			return typeInfo;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
index e13968e..d5be274 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
@@ -15,18 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.io.IOException;
-import org.apache.avro.specific.SpecificRecord;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
 import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema;
 import org.apache.flink.types.Row;
-import static org.junit.Assert.assertEquals;
+
+import org.apache.avro.specific.SpecificRecord;
 import org.junit.Test;
 
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
 /**
  * Test for the Avro serialization and deserialization schema.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkFixedPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkFixedPartitionerTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkFixedPartitionerTest.java
new file mode 100644
index 0000000..b62bdd5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkFixedPartitionerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link FlinkFixedPartitioner}.
+ */
+public class FlinkFixedPartitionerTest {
+
+	/**
+	 * Test for when there are more sinks than partitions.
+	 * <pre>
+	 *   		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2   --------------/
+	 * 			3   -------------/
+	 * 			4	------------/
+	 * </pre>
+	 */
+	@Test
+	public void testMoreFlinkThanBrokers() {
+		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+		int[] partitions = new int[]{0};
+
+		part.open(0, 4);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 4);
+		Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
+
+		part.open(2, 4);
+		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
+		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
+
+		part.open(3, 4);
+		Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
+	}
+
+	/**
+	 * Tests for when there are more partitions than sinks.
+	 * <pre>
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2	---------------->	2
+	 * 									3
+	 * 									4
+	 * 									5
+	 *
+	 * </pre>
+	 */
+	@Test
+	public void testFewerPartitions() {
+		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+		int[] partitions = new int[]{0, 1, 2, 3, 4};
+		part.open(0, 2);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 2);
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+	}
+
+	/*
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	------------>--->	1
+	 * 			2	-----------/----> 	2
+	 * 			3	----------/
+	 */
+	@Test
+	public void testMixedCase() {
+		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+		int[] partitions = new int[]{0, 1};
+
+		part.open(0, 3);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 3);
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+
+		part.open(2, 3);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
index c07ebd5..77417ab 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -29,6 +30,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -36,9 +38,9 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.mockito.Mockito.mock;
 
@@ -316,7 +318,6 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest {
 		}
 	}
 
-
 	// ------------------------------------------------------------------------
 
 	private interface FetcherFactory<T> extends Serializable {

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
index f11bf9f..f13cbe0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
@@ -15,19 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.connectors.kafka;
 
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
@@ -45,12 +35,25 @@ import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * Tests for checking whether {@link FlinkKafkaConsumerBase} can restore from snapshots that were
  * done using the Flink 1.2 {@link FlinkKafkaConsumerBase}.
@@ -60,7 +63,7 @@ import org.mockito.stubbing.Answer;
  */
 public class FlinkKafkaConsumerBaseFrom12MigrationTest {
 
-	final static HashMap<KafkaTopicPartition, Long> PARTITION_STATE = new HashMap<>();
+	private static final HashMap<KafkaTopicPartition, Long> PARTITION_STATE = new HashMap<>();
 
 	static {
 		PARTITION_STATE.put(new KafkaTopicPartition("abc", 13), 16768L);
@@ -101,7 +104,6 @@ public class FlinkKafkaConsumerBaseFrom12MigrationTest {
 		StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
 				new StreamSource<>(consumerFunction);
 
-
 		final AbstractStreamOperatorTestHarness<String> testHarness =
 				new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
 
@@ -307,8 +309,7 @@ public class FlinkKafkaConsumerBaseFrom12MigrationTest {
 		}
 	}
 
-
-	private static abstract class DummySourceContext
+	private abstract static class DummySourceContext
 			implements SourceFunction.SourceContext<String> {
 
 		private final Object lock = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index ccf2ed2..d673e8e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.commons.collections.map.LinkedMap;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -35,6 +34,8 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
+
+import org.apache.commons.collections.map.LinkedMap;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -61,6 +62,9 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link FlinkKafkaConsumerBase}.
+ */
 public class FlinkKafkaConsumerBaseTest {
 
 	/**
@@ -77,12 +81,12 @@ public class FlinkKafkaConsumerBaseTest {
 			new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null);
 			fail();
 		} catch (NullPointerException ignored) {}
-		
+
 		@SuppressWarnings("unchecked")
 		final AssignerWithPeriodicWatermarks<String> periodicAssigner = mock(AssignerWithPeriodicWatermarks.class);
 		@SuppressWarnings("unchecked")
 		final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
-		
+
 		DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>();
 		c1.assignTimestampsAndWatermarks(periodicAssigner);
 		try {
@@ -189,7 +193,7 @@ public class FlinkKafkaConsumerBaseTest {
 	}
 
 	/**
-	 * Tests that on snapshots, states and offsets to commit to Kafka are correct
+	 * Tests that on snapshots, states and offsets to commit to Kafka are correct.
 	 */
 	@SuppressWarnings("unchecked")
 	@Test
@@ -301,7 +305,7 @@ public class FlinkKafkaConsumerBaseTest {
 		state3.put(new KafkaTopicPartition("def", 7), 987654377L);
 
 		// --------------------------------------------------------------------
-		
+
 		final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
 		when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
 
@@ -356,7 +360,7 @@ public class FlinkKafkaConsumerBaseTest {
 		assertEquals(state2, snapshot2);
 		assertEquals(2, pendingOffsetsToCommit.size());
 		assertEquals(state2, pendingOffsetsToCommit.get(140L));
-		
+
 		// ack checkpoint 1
 		consumer.notifyCheckpointComplete(138L);
 		assertEquals(1, pendingOffsetsToCommit.size());
@@ -375,12 +379,11 @@ public class FlinkKafkaConsumerBaseTest {
 		assertEquals(state3, snapshot3);
 		assertEquals(2, pendingOffsetsToCommit.size());
 		assertEquals(state3, pendingOffsetsToCommit.get(141L));
-		
+
 		// ack checkpoint 3, subsumes number 2
 		consumer.notifyCheckpointComplete(141L);
 		assertEquals(0, pendingOffsetsToCommit.size());
 
-
 		consumer.notifyCheckpointComplete(666); // invalid checkpoint
 		assertEquals(0, pendingOffsetsToCommit.size());
 
@@ -504,7 +507,6 @@ public class FlinkKafkaConsumerBaseTest {
 		consumer.notifyCheckpointComplete(141L);
 		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
 
-
 		consumer.notifyCheckpointComplete(666); // invalid checkpoint
 		verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
 
@@ -535,8 +537,7 @@ public class FlinkKafkaConsumerBaseTest {
 	// ------------------------------------------------------------------------
 
 	private static <T> FlinkKafkaConsumerBase<T> getConsumer(
-			AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception
-	{
+			AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception {
 		FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>();
 		StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
 		Mockito.when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
index 6b2cc02..08c5f01 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -15,21 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -53,10 +55,13 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link FlinkKafkaProducerBase}.
+ */
 public class FlinkKafkaProducerBaseTest {
 
 	/**
-	 * Tests that the constructor eagerly checks bootstrap servers are set in config
+	 * Tests that the constructor eagerly checks bootstrap servers are set in config.
 	 */
 	@Test(expected = IllegalArgumentException.class)
 	public void testInstantiationFailsWhenBootstrapServersMissing() throws Exception {
@@ -67,7 +72,7 @@ public class FlinkKafkaProducerBaseTest {
 	}
 
 	/**
-	 * Tests that constructor defaults to key value serializers in config to byte array deserializers if not set
+	 * Tests that constructor defaults to key value serializers in config to byte array deserializers if not set.
 	 */
 	@Test
 	public void testKeyValueDeserializersSetIfMissing() throws Exception {
@@ -83,7 +88,7 @@ public class FlinkKafkaProducerBaseTest {
 	}
 
 	/**
-	 * Tests that partitions list is determinate and correctly provided to custom partitioner
+	 * Tests that partitions list is determinate and correctly provided to custom partitioner.
 	 */
 	@SuppressWarnings("unchecked")
 	@Test
@@ -93,7 +98,7 @@ public class FlinkKafkaProducerBaseTest {
 		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
 		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
 		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
-		
+
 		// out-of-order list of 4 partitions
 		List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
 		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
@@ -118,7 +123,7 @@ public class FlinkKafkaProducerBaseTest {
 	}
 
 	/**
-	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
+	 * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown.
 	 */
 	@Test
 	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
@@ -149,7 +154,7 @@ public class FlinkKafkaProducerBaseTest {
 	}
 
 	/**
-	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
+	 * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown.
 	 */
 	@Test
 	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
@@ -183,11 +188,11 @@ public class FlinkKafkaProducerBaseTest {
 	 * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
 	 * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
 	 *
-	 * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
+	 * <p>Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
 	 * The test for that is covered in testAtLeastOnceProducer.
 	 */
 	@SuppressWarnings("unchecked")
-	@Test(timeout=5000)
+	@Test(timeout = 5000)
 	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
 		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
 			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
@@ -237,10 +242,10 @@ public class FlinkKafkaProducerBaseTest {
 
 	/**
 	 * Test ensuring that the producer is not dropping buffered records;
-	 * we set a timeout because the test will not finish if the logic is broken
+	 * we set a timeout because the test will not finish if the logic is broken.
 	 */
 	@SuppressWarnings("unchecked")
-	@Test(timeout=10000)
+	@Test(timeout = 10000)
 	public void testAtLeastOnceProducer() throws Throwable {
 		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
 			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
@@ -297,10 +302,10 @@ public class FlinkKafkaProducerBaseTest {
 	/**
 	 * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
 	 * the snapshot method does indeed finishes without waiting for pending records;
-	 * we set a timeout because the test will not finish if the logic is broken
+	 * we set a timeout because the test will not finish if the logic is broken.
 	 */
 	@SuppressWarnings("unchecked")
-	@Test(timeout=5000)
+	@Test(timeout = 5000)
 	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
 		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
 			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
@@ -328,8 +333,8 @@ public class FlinkKafkaProducerBaseTest {
 
 	private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
 		private static final long serialVersionUID = 1L;
-		
-		private final static String DUMMY_TOPIC = "dummy-topic";
+
+		private static final String DUMMY_TOPIC = "dummy-topic";
 
 		private transient KafkaProducer<?, ?> mockProducer;
 		private transient List<Callback> pendingCallbacks;

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
index 1882a7e..51e483b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
@@ -14,16 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
 
+/**
+ * Tests for the {@link JSONDeserializationSchema}.
+ */
 public class JSONDeserializationSchemaTest {
 	@Test
 	public void testDeserialize() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
index 86d3105..565ef00 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
@@ -14,16 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
 
+/**
+ * Tests for the{@link JSONKeyValueDeserializationSchema}.
+ */
 public class JSONKeyValueDeserializationSchemaTest {
 	@Test
 	public void testDeserializeWithoutMetadata() throws IOException {
@@ -39,7 +44,6 @@ public class JSONKeyValueDeserializationSchemaTest {
 		JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
 		ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0);
 
-
 		Assert.assertTrue(deserializedValue.get("metadata") == null);
 		Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt());
 		Assert.assertEquals("world", deserializedValue.get("value").get("word").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
index f03feeb..186e364 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.types.Row;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -35,6 +36,9 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for the {@link JsonRowDeserializationSchema}.
+ */
 public class JsonRowDeserializationSchemaTest {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
index 523eafe..43bde35 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
@@ -14,19 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.types.Row;
 import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.types.Row;
+
 import org.junit.Test;
 
 import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for the {@link JsonRowSerializationSchema}.
+ */
 public class JsonRowSerializationSchemaTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index c24640d..0be1d57 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -25,10 +25,10 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;