You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:39 UTC
[09/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-connector-kafka*
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
index c736493..06cdf2c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
@@ -19,24 +19,25 @@
package org.apache.flink.streaming.connectors.kafka.internals;
import javax.annotation.Nullable;
+
import java.util.concurrent.atomic.AtomicReference;
/**
* A proxy that communicates exceptions between threads. Typically used if an exception
* from a spawned thread needs to be recognized by the "parent" (spawner) thread.
- *
+ *
* <p>The spawned thread would set the exception via {@link #reportError(Throwable)}.
* The parent would check (at certain points) for exceptions via {@link #checkAndThrowException()}.
* Optionally, the parent can pass itself in the constructor to be interrupted as soon as
* an exception occurs.
- *
+ *
* <pre>
* {@code
- *
+ *
* final ExceptionProxy errorProxy = new ExceptionProxy(Thread.currentThread());
- *
+ *
* Thread subThread = new Thread() {
- *
+ *
* public void run() {
* try {
* doSomething();
@@ -48,13 +49,13 @@ import java.util.concurrent.atomic.AtomicReference;
* }
* };
* subThread.start();
- *
+ *
* doSomethingElse();
* errorProxy.checkAndThrowException();
- *
+ *
* doSomethingMore();
* errorProxy.checkAndThrowException();
- *
+ *
* try {
* subThread.join();
* } catch (InterruptedException e) {
@@ -66,33 +67,33 @@ import java.util.concurrent.atomic.AtomicReference;
* </pre>
*/
public class ExceptionProxy {
-
- /** The thread that should be interrupted when an exception occurs */
+
+ /** The thread that should be interrupted when an exception occurs. */
private final Thread toInterrupt;
-
- /** The exception to throw */
+
+ /** The exception to throw. */
private final AtomicReference<Throwable> exception;
/**
* Creates an exception proxy that interrupts the given thread upon
* report of an exception. The thread to interrupt may be null.
- *
+ *
* @param toInterrupt The thread to interrupt upon an exception. May be null.
*/
public ExceptionProxy(@Nullable Thread toInterrupt) {
this.toInterrupt = toInterrupt;
this.exception = new AtomicReference<>();
}
-
+
// ------------------------------------------------------------------------
-
+
/**
* Sets the exception and interrupts the target thread,
* if no other exception has occurred so far.
- *
+ *
* <p>The exception is only set (and the interruption is only triggered),
* if no other exception was set before.
- *
+ *
* @param t The exception that occurred
*/
public void reportError(Throwable t) {
@@ -105,7 +106,7 @@ public class ExceptionProxy {
/**
* Checks whether an exception has been set via {@link #reportError(Throwable)}.
* If yes, that exception if re-thrown by this method.
- *
+ *
* @throws Exception This method re-throws the exception, if set.
*/
public void checkAndThrowException() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
index c68fe28..f3645e3 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -27,16 +27,16 @@ import static java.util.Objects.requireNonNull;
/**
* Flink's description of a partition in a Kafka topic.
* Serializable, and common across all Kafka consumer subclasses (0.8, 0.9, ...)
- *
+ *
* <p>Note: This class must not change in its structure, because it would change the
* serialization format and make previous savepoints unreadable.
*/
public final class KafkaTopicPartition implements Serializable {
/** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
- * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS */
+ * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS. */
private static final long serialVersionUID = 722083576322742325L;
-
+
// ------------------------------------------------------------------------
private final String topic;
@@ -50,7 +50,7 @@ public final class KafkaTopicPartition implements Serializable {
}
// ------------------------------------------------------------------------
-
+
public String getTopic() {
return topic;
}
@@ -60,7 +60,7 @@ public final class KafkaTopicPartition implements Serializable {
}
// ------------------------------------------------------------------------
-
+
@Override
public String toString() {
return "KafkaTopicPartition{" +
@@ -87,7 +87,7 @@ public final class KafkaTopicPartition implements Serializable {
public int hashCode() {
return cachedHash;
}
-
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
@@ -109,10 +109,9 @@ public final class KafkaTopicPartition implements Serializable {
return sb.toString();
}
-
public static List<KafkaTopicPartition> dropLeaderData(List<KafkaTopicPartitionLeader> partitionInfos) {
List<KafkaTopicPartition> ret = new ArrayList<>(partitionInfos.size());
- for(KafkaTopicPartitionLeader ktpl: partitionInfos) {
+ for (KafkaTopicPartitionLeader ktpl: partitionInfos) {
ret.add(ktpl.getTopicPartition());
}
return ret;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
index adfbf79..78ab612 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionState.java
@@ -20,31 +20,31 @@ package org.apache.flink.streaming.connectors.kafka.internals;
/**
* The state that the Flink Kafka Consumer holds for each Kafka partition.
* Includes the Kafka descriptor for partitions.
- *
+ *
* <p>This class describes the most basic state (only the offset), subclasses
* define more elaborate state, containing current watermarks and timestamp
* extractors.
- *
+ *
* @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
*/
public class KafkaTopicPartitionState<KPH> {
-
+
// ------------------------------------------------------------------------
- /** The Flink description of a Kafka partition */
+ /** The Flink description of a Kafka partition. */
private final KafkaTopicPartition partition;
- /** The Kafka description of a Kafka partition (varies across different Kafka versions) */
+ /** The Kafka description of a Kafka partition (varies across different Kafka versions). */
private final KPH kafkaPartitionHandle;
-
- /** The offset within the Kafka partition that we already processed */
+
+ /** The offset within the Kafka partition that we already processed. */
private volatile long offset;
- /** The offset of the Kafka partition that has been committed */
+ /** The offset of the Kafka partition that has been committed. */
private volatile long committedOffset;
// ------------------------------------------------------------------------
-
+
public KafkaTopicPartitionState(KafkaTopicPartition partition, KPH kafkaPartitionHandle) {
this.partition = partition;
this.kafkaPartitionHandle = kafkaPartitionHandle;
@@ -103,7 +103,6 @@ public class KafkaTopicPartitionState<KPH> {
return committedOffset;
}
-
// ------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
index 153a326..c218618 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateSentinel.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.internals;
/**
* Magic values used to represent special offset states before partitions are actually read.
*
- * The values are all negative. Negative offsets are not used by Kafka (invalid), so we
+ * <p>The values are all negative. Negative offsets are not used by Kafka (invalid), so we
* pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else.
*/
public class KafkaTopicPartitionStateSentinel {
@@ -31,7 +31,7 @@ public class KafkaTopicPartitionStateSentinel {
/**
* Magic number that defines the partition should start from the earliest offset.
*
- * This is used as a placeholder so that the actual earliest offset can be evaluated lazily
+ * <p>This is used as a placeholder so that the actual earliest offset can be evaluated lazily
* when the partition will actually start to be read by the consumer.
*/
public static final long EARLIEST_OFFSET = -915623761775L;
@@ -39,7 +39,7 @@ public class KafkaTopicPartitionStateSentinel {
/**
* Magic number that defines the partition should start from the latest offset.
*
- * This is used as a placeholder so that the actual latest offset can be evaluated lazily
+ * <p>This is used as a placeholder so that the actual latest offset can be evaluated lazily
* when the partition will actually start to be read by the consumer.
*/
public static final long LATEST_OFFSET = -915623761774L;
@@ -47,7 +47,7 @@ public class KafkaTopicPartitionStateSentinel {
/**
* Magic number that defines the partition should start from its committed group offset in Kafka.
*
- * This is used as a placeholder so that the actual committed group offset can be evaluated lazily
+ * <p>This is used as a placeholder so that the actual committed group offset can be evaluated lazily
* when the partition will actually start to be read by the consumer.
*/
public static final long GROUP_OFFSET = -915623761773L;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
index efdc73f..5116e9f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
@@ -23,36 +23,35 @@ import org.apache.flink.streaming.api.watermark.Watermark;
/**
* A special version of the per-kafka-partition-state that additionally holds
* a periodic watermark generator (and timestamp extractor) per partition.
- *
+ *
* @param <T> The type of records handled by the watermark generator
* @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions.
*/
public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
-
- /** The timestamp assigner and watermark generator for the partition */
+
+ /** The timestamp assigner and watermark generator for the partition. */
private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks;
-
- /** The last watermark timestamp generated by this partition */
+
+ /** The last watermark timestamp generated by this partition. */
private long partitionWatermark;
// ------------------------------------------------------------------------
-
+
public KafkaTopicPartitionStateWithPeriodicWatermarks(
KafkaTopicPartition partition, KPH kafkaPartitionHandle,
- AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks)
- {
+ AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks) {
super(partition, kafkaPartitionHandle);
-
+
this.timestampsAndWatermarks = timestampsAndWatermarks;
this.partitionWatermark = Long.MIN_VALUE;
}
// ------------------------------------------------------------------------
-
+
public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
}
-
+
public long getCurrentWatermarkTimestamp() {
Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
if (wm != null) {
@@ -62,7 +61,7 @@ public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extend
}
// ------------------------------------------------------------------------
-
+
@Override
public String toString() {
return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition()
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
index edf40ce..f4a80a4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java
@@ -25,35 +25,34 @@ import javax.annotation.Nullable;
/**
* A special version of the per-kafka-partition-state that additionally holds
* a periodic watermark generator (and timestamp extractor) per partition.
- *
+ *
* <p>This class is not thread safe, but it gives volatile access to the current
* partition watermark ({@link #getCurrentPartitionWatermark()}).
- *
+ *
* @param <T> The type of records handled by the watermark generator
* @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions
*/
public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> {
-
- /** The timestamp assigner and watermark generator for the partition */
+
+ /** The timestamp assigner and watermark generator for the partition. */
private final AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks;
-
- /** The last watermark timestamp generated by this partition */
+
+ /** The last watermark timestamp generated by this partition. */
private volatile long partitionWatermark;
// ------------------------------------------------------------------------
-
+
public KafkaTopicPartitionStateWithPunctuatedWatermarks(
KafkaTopicPartition partition, KPH kafkaPartitionHandle,
- AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks)
- {
+ AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks) {
super(partition, kafkaPartitionHandle);
-
+
this.timestampsAndWatermarks = timestampsAndWatermarks;
this.partitionWatermark = Long.MIN_VALUE;
}
// ------------------------------------------------------------------------
-
+
public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
}
@@ -69,13 +68,13 @@ public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> exte
return null;
}
}
-
+
public long getCurrentPartitionWatermark() {
return partitionWatermark;
}
// ------------------------------------------------------------------------
-
+
@Override
public String toString() {
return "KafkaTopicPartitionStateWithPunctuatedWatermarks: partition=" + getKafkaTopicPartition()
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
index e47c667..6ed3717 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kafka.partitioner;
import org.apache.flink.util.Preconditions;
@@ -22,9 +23,9 @@ import org.apache.flink.util.Preconditions;
/**
* A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
*
- * Note, one Kafka partition can contain multiple Flink partitions.
+ * <p>Note, one Kafka partition can contain multiple Flink partitions.
*
- * Cases:
+ * <p>Cases:
* # More Flink partitions than kafka partitions
* <pre>
* Flink Sinks: Kafka Partitions
@@ -35,7 +36,7 @@ import org.apache.flink.util.Preconditions;
* </pre>
* Some (or all) kafka partitions contain the output of more than one flink partition
*
- *# Fewer Flink partitions than Kafka
+ * <p>Fewer Flink partitions than Kafka
* <pre>
* Flink Sinks: Kafka Partitions
* 1 ----------------> 1
@@ -45,9 +46,9 @@ import org.apache.flink.util.Preconditions;
* 5
* </pre>
*
- * Not all Kafka partitions contain data
- * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner (note that this will
- * cause a lot of network connections between all the Flink instances and all the Kafka brokers).
+ * <p>Not all Kafka partitions contain data
+ * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner (note that this will
+ * cause a lot of network connections between all the Flink instances and all the Kafka brokers).
*/
public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
@@ -60,13 +61,13 @@ public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
this.parallelInstanceId = parallelInstanceId;
}
-
+
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
Preconditions.checkArgument(
partitions != null && partitions.length > 0,
"Partitions of the target topic is empty.");
-
+
return partitions[parallelInstanceId % partitions.length];
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
index b7b4143..168e76b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kafka.partitioner;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
index 37241f5..0713738 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
@@ -14,11 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.util.serialization;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
@@ -28,13 +29,15 @@ import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.util.Utf8;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
/**
* Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}.
*
- * Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
+ * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
*
* {@link Utf8} is converted to regular Java Strings.
*/
@@ -56,7 +59,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
private final MutableByteArrayInputStream inputStream;
/**
- * Avro decoder that decodes binary data
+ * Avro decoder that decodes binary data.
*/
private final Decoder decoder;
@@ -133,9 +136,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
* InputStream instance, copying message to process, and creation of Decoder on every new message.
*/
private static final class MutableByteArrayInputStream extends ByteArrayInputStream {
- /**
- * Create MutableByteArrayInputStream
- */
+
public MutableByteArrayInputStream() {
super(new byte[0]);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
index 8388ab5..450c78f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
@@ -15,11 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.util.serialization;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.List;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -30,8 +31,10 @@ import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.util.Utf8;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
/**
* Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes.
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
index d170058..095e964 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.util.serialization;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -21,11 +22,10 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
-
/**
* DeserializationSchema that deserializes a JSON String into an ObjectNode.
- * <p>
- * Fields can be accessed by calling objectNode.get(<name>).as(<type>)
+ *
+ * <p>Fields can be accessed by calling objectNode.get(<name>).as(<type>)
*/
public class JSONDeserializationSchema extends AbstractDeserializationSchema<ObjectNode> {
private ObjectMapper mapper;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
index 261a111..f75df0c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java
@@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.util.serialization;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
@@ -27,12 +29,12 @@ import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
/**
* DeserializationSchema that deserializes a JSON String into an ObjectNode.
- * <p>
- * Key fields can be accessed by calling objectNode.get("key").get(<name>).as(<type>)
- * <p>
- * Value fields can be accessed by calling objectNode.get("value").get(<name>).as(<type>)
- * <p>
- * Metadata fields can be accessed by calling objectNode.get("metadata").get(<name>).as(<type>) and include
+ *
+ * <p>Key fields can be accessed by calling objectNode.get("key").get(<name>).as(<type>)
+ *
+ * <p>Value fields can be accessed by calling objectNode.get("value").get(<name>).as(<type>)
+ *
+ * <p>Metadata fields can be accessed by calling objectNode.get("metadata").get(<name>).as(<type>) and include
* the "offset" (long), "topic" (String) and "partition" (int).
*/
public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
index be201fa..f335c30 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java
@@ -18,14 +18,16 @@
package org.apache.flink.streaming.util.serialization;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
/**
* Deserialization schema from JSON to {@link Row}.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
index 1998aa6..a3fa379 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -14,14 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.util.serialization;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* Serialization schema that serializes an object into a JSON bytes.
@@ -33,9 +34,9 @@ import org.apache.flink.util.Preconditions;
* {@link JsonRowDeserializationSchema}.
*/
public class JsonRowSerializationSchema implements SerializationSchema<Row> {
- /** Fields names in the input Row object */
+ /** Fields names in the input Row object. */
private final String[] fieldNames;
- /** Object mapper that is used to create output JSON objects */
+ /** Object mapper that is used to create output JSON objects. */
private static ObjectMapper mapper = new ObjectMapper();
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
index b5a33bc..234a96d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
* The deserialization schema describes how to turn the byte key / value messages delivered by certain
* data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
* processed by Flink.
- *
+ *
* @param <T> The type created by the keyed deserialization schema.
*/
public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
@@ -46,7 +46,7 @@ public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQ
/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted.
- *
+ *
* @param nextElement The element to test for the end-of-stream signal.
*
* @return True, if the element signals end of stream, false otherwise.
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
index 4b9dba2..e128aba 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.util.serialization;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -22,7 +23,7 @@ import java.io.IOException;
/**
* A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema
- * interface
+ * interface.
* @param <T> The type created by the deserialization schema.
*/
public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
@@ -34,6 +35,7 @@ public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializatio
public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
}
+
@Override
public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
return deserializationSchema.deserialize(message);
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
index 701281e..12bcab9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.util.serialization;
import java.io.Serializable;
@@ -22,7 +23,7 @@ import java.io.Serializable;
* The serialization schema describes how to turn a data object into a different serialized
* representation. Most data sinks (for example Apache Kafka) require the data to be handed
* to them in a specific format (for example as byte strings).
- *
+ *
* @param <T> The type to be serialized.
*/
public interface KeyedSerializationSchema<T> extends Serializable {
@@ -36,17 +37,16 @@ public interface KeyedSerializationSchema<T> extends Serializable {
*/
byte[] serializeKey(T element);
-
/**
- * Serializes the value of the incoming element to a byte array
- *
+ * Serializes the value of the incoming element to a byte array.
+ *
* @param element The incoming element to be serialized
* @return the value of the element as a byte array
*/
byte[] serializeValue(T element);
/**
- * Optional method to determine the target topic for the element
+ * Optional method to determine the target topic for the element.
*
* @param element Incoming element to determine the target topic from
* @return null or the target topic
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
index 1b3e486..0a181d1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
@@ -14,11 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.util.serialization;
/**
* A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema
- * interface
+ * interface.
* @param <T> The type to serialize
*/
public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
index 51bc8d1..3e0cdb5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -32,30 +32,29 @@ import java.io.IOException;
/**
* A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to
* transform typed from and to byte arrays.
- *
+ *
* @param <K> The key type to be serialized.
* @param <V> The value type to be serialized.
*/
-public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> {
+public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K, V>> {
private static final long serialVersionUID = -5359448468131559102L;
- /** The serializer for the key */
+ /** The serializer for the key. */
private final TypeSerializer<K> keySerializer;
- /** The serializer for the value */
+ /** The serializer for the value. */
private final TypeSerializer<V> valueSerializer;
- /** reusable input deserialization buffer */
+ /** reusable input deserialization buffer. */
private final DataInputDeserializer inputDeserializer;
-
- /** reusable output serialization buffer for the key */
+
+ /** reusable output serialization buffer for the key. */
private transient DataOutputSerializer keyOutputSerializer;
- /** reusable output serialization buffer for the value */
+ /** reusable output serialization buffer for the value. */
private transient DataOutputSerializer valueOutputSerializer;
-
-
+
/** The type information, to be returned by {@link #getProducedType()}. It is
* transient, because it is not serializable. Note that this means that the type information
* is not available at runtime, but only prior to the first serialization / deserialization */
@@ -80,10 +79,10 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDe
/**
* Creates a new de-/serialization schema for the given types. This constructor accepts the types
* as classes and internally constructs the type information from the classes.
- *
+ *
* <p>If the types are parametrized and cannot be fully defined via classes, use the constructor
* that accepts {@link TypeInformation} instead.
- *
+ *
* @param keyClass The class of the key de-/serialized by this schema.
* @param valueClass The class of the value de-/serialized by this schema.
* @param config The execution config, which is used to parametrize the type serializers.
@@ -94,12 +93,11 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDe
// ------------------------------------------------------------------------
-
@Override
public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
K key = null;
V value = null;
-
+
if (messageKey != null) {
inputDeserializer.setBuffer(messageKey, 0, messageKey.length);
key = keySerializer.deserialize(inputDeserializer);
@@ -117,11 +115,10 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDe
* @return Returns false.
*/
@Override
- public boolean isEndOfStream(Tuple2<K,V> nextElement) {
+ public boolean isEndOfStream(Tuple2<K, V> nextElement) {
return false;
}
-
@Override
public byte[] serializeKey(Tuple2<K, V> element) {
if (element.f0 == null) {
@@ -182,9 +179,8 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDe
return null; // we are never overriding the topic
}
-
@Override
- public TypeInformation<Tuple2<K,V>> getProducedType() {
+ public TypeInformation<Tuple2<K, V>> getProducedType() {
if (typeInfo != null) {
return typeInfo;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
index e13968e..d5be274 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
@@ -15,18 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kafka;
-import java.io.IOException;
-import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema;
import org.apache.flink.types.Row;
-import static org.junit.Assert.assertEquals;
+
+import org.apache.avro.specific.SpecificRecord;
import org.junit.Test;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
/**
* Test for the Avro serialization and deserialization schema.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkFixedPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkFixedPartitionerTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkFixedPartitionerTest.java
new file mode 100644
index 0000000..b62bdd5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkFixedPartitionerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link FlinkFixedPartitioner}.
+ */
+public class FlinkFixedPartitionerTest {
+
+ /**
+ * Test for when there are more sinks than partitions.
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 --------------/
+ * 3 -------------/
+ * 4 ------------/
+ * </pre>
+ */
+ @Test
+ public void testMoreFlinkThanBrokers() {
+ FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+ int[] partitions = new int[]{0};
+
+ part.open(0, 4);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 4);
+ Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
+
+ part.open(2, 4);
+ Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
+ Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
+
+ part.open(3, 4);
+ Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
+ }
+
+ /**
+ * Tests for when there are more partitions than sinks.
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 ----------------> 2
+ * 3
+ * 4
+ * 5
+ *
+ * </pre>
+ */
+ @Test
+ public void testFewerPartitions() {
+ FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+ int[] partitions = new int[]{0, 1, 2, 3, 4};
+ part.open(0, 2);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 2);
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+ }
+
+ /*
+ * Flink Sinks: Kafka Partitions
+ * 1 ------------>---> 1
+ * 2 -----------/----> 2
+ * 3 ----------/
+ */
+ @Test
+ public void testMixedCase() {
+ FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+ int[] partitions = new int[]{0, 1};
+
+ part.open(0, 3);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 3);
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+
+ part.open(2, 3);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
index c07ebd5..77417ab 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -29,6 +30,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
+
import org.junit.Assert;
import org.junit.Test;
@@ -36,9 +38,9 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Map;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.mockito.Mockito.mock;
@@ -316,7 +318,6 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest {
}
}
-
// ------------------------------------------------------------------------
private interface FetcherFactory<T> extends Serializable {
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
index f11bf9f..f13cbe0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom12MigrationTest.java
@@ -15,19 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.connectors.kafka;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+package org.apache.flink.streaming.connectors.kafka;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
@@ -45,12 +35,25 @@ import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
+
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Tests for checking whether {@link FlinkKafkaConsumerBase} can restore from snapshots that were
* done using the Flink 1.2 {@link FlinkKafkaConsumerBase}.
@@ -60,7 +63,7 @@ import org.mockito.stubbing.Answer;
*/
public class FlinkKafkaConsumerBaseFrom12MigrationTest {
- final static HashMap<KafkaTopicPartition, Long> PARTITION_STATE = new HashMap<>();
+ private static final HashMap<KafkaTopicPartition, Long> PARTITION_STATE = new HashMap<>();
static {
PARTITION_STATE.put(new KafkaTopicPartition("abc", 13), 16768L);
@@ -101,7 +104,6 @@ public class FlinkKafkaConsumerBaseFrom12MigrationTest {
StreamSource<String, DummyFlinkKafkaConsumer<String>> consumerOperator =
new StreamSource<>(consumerFunction);
-
final AbstractStreamOperatorTestHarness<String> testHarness =
new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
@@ -307,8 +309,7 @@ public class FlinkKafkaConsumerBaseFrom12MigrationTest {
}
}
-
- private static abstract class DummySourceContext
+ private abstract static class DummySourceContext
implements SourceFunction.SourceContext<String> {
private final Object lock = new Object();
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index ccf2ed2..d673e8e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
@@ -35,6 +34,8 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
+
+import org.apache.commons.collections.map.LinkedMap;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
@@ -61,6 +62,9 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+/**
+ * Tests for the {@link FlinkKafkaConsumerBase}.
+ */
public class FlinkKafkaConsumerBaseTest {
/**
@@ -77,12 +81,12 @@ public class FlinkKafkaConsumerBaseTest {
new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null);
fail();
} catch (NullPointerException ignored) {}
-
+
@SuppressWarnings("unchecked")
final AssignerWithPeriodicWatermarks<String> periodicAssigner = mock(AssignerWithPeriodicWatermarks.class);
@SuppressWarnings("unchecked")
final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
-
+
DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>();
c1.assignTimestampsAndWatermarks(periodicAssigner);
try {
@@ -189,7 +193,7 @@ public class FlinkKafkaConsumerBaseTest {
}
/**
- * Tests that on snapshots, states and offsets to commit to Kafka are correct
+ * Tests that on snapshots, states and offsets to commit to Kafka are correct.
*/
@SuppressWarnings("unchecked")
@Test
@@ -301,7 +305,7 @@ public class FlinkKafkaConsumerBaseTest {
state3.put(new KafkaTopicPartition("def", 7), 987654377L);
// --------------------------------------------------------------------
-
+
final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class);
when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3);
@@ -356,7 +360,7 @@ public class FlinkKafkaConsumerBaseTest {
assertEquals(state2, snapshot2);
assertEquals(2, pendingOffsetsToCommit.size());
assertEquals(state2, pendingOffsetsToCommit.get(140L));
-
+
// ack checkpoint 1
consumer.notifyCheckpointComplete(138L);
assertEquals(1, pendingOffsetsToCommit.size());
@@ -375,12 +379,11 @@ public class FlinkKafkaConsumerBaseTest {
assertEquals(state3, snapshot3);
assertEquals(2, pendingOffsetsToCommit.size());
assertEquals(state3, pendingOffsetsToCommit.get(141L));
-
+
// ack checkpoint 3, subsumes number 2
consumer.notifyCheckpointComplete(141L);
assertEquals(0, pendingOffsetsToCommit.size());
-
consumer.notifyCheckpointComplete(666); // invalid checkpoint
assertEquals(0, pendingOffsetsToCommit.size());
@@ -504,7 +507,6 @@ public class FlinkKafkaConsumerBaseTest {
consumer.notifyCheckpointComplete(141L);
verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
-
consumer.notifyCheckpointComplete(666); // invalid checkpoint
verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
@@ -535,8 +537,7 @@ public class FlinkKafkaConsumerBaseTest {
// ------------------------------------------------------------------------
private static <T> FlinkKafkaConsumerBase<T> getConsumer(
- AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception
- {
+ AbstractFetcher<T, ?> fetcher, LinkedMap pendingOffsetsToCommit, boolean running) throws Exception {
FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>();
StreamingRuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
Mockito.when(mockRuntimeContext.isCheckpointingEnabled()).thenReturn(true);
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
index 6b2cc02..08c5f01 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -15,21 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -53,10 +55,13 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+/**
+ * Tests for the {@link FlinkKafkaProducerBase}.
+ */
public class FlinkKafkaProducerBaseTest {
/**
- * Tests that the constructor eagerly checks bootstrap servers are set in config
+ * Tests that the constructor eagerly checks bootstrap servers are set in config.
*/
@Test(expected = IllegalArgumentException.class)
public void testInstantiationFailsWhenBootstrapServersMissing() throws Exception {
@@ -67,7 +72,7 @@ public class FlinkKafkaProducerBaseTest {
}
/**
- * Tests that constructor defaults to key value serializers in config to byte array deserializers if not set
+ * Tests that constructor defaults to key value serializers in config to byte array deserializers if not set.
*/
@Test
public void testKeyValueDeserializersSetIfMissing() throws Exception {
@@ -83,7 +88,7 @@ public class FlinkKafkaProducerBaseTest {
}
/**
- * Tests that partitions list is determinate and correctly provided to custom partitioner
+ * Tests that partitions list is determinate and correctly provided to custom partitioner.
*/
@SuppressWarnings("unchecked")
@Test
@@ -93,7 +98,7 @@ public class FlinkKafkaProducerBaseTest {
RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
-
+
// out-of-order list of 4 partitions
List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
@@ -118,7 +123,7 @@ public class FlinkKafkaProducerBaseTest {
}
/**
- * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
+ * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown.
*/
@Test
public void testAsyncErrorRethrownOnInvoke() throws Throwable {
@@ -149,7 +154,7 @@ public class FlinkKafkaProducerBaseTest {
}
/**
- * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
+ * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown.
*/
@Test
public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
@@ -183,11 +188,11 @@ public class FlinkKafkaProducerBaseTest {
* Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
* it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
*
- * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
+ * <p>Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
* The test for that is covered in testAtLeastOnceProducer.
*/
@SuppressWarnings("unchecked")
- @Test(timeout=5000)
+ @Test(timeout = 5000)
public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
@@ -237,10 +242,10 @@ public class FlinkKafkaProducerBaseTest {
/**
* Test ensuring that the producer is not dropping buffered records;
- * we set a timeout because the test will not finish if the logic is broken
+ * we set a timeout because the test will not finish if the logic is broken.
*/
@SuppressWarnings("unchecked")
- @Test(timeout=10000)
+ @Test(timeout = 10000)
public void testAtLeastOnceProducer() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
@@ -297,10 +302,10 @@ public class FlinkKafkaProducerBaseTest {
/**
* This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
* the snapshot method does indeed finishes without waiting for pending records;
- * we set a timeout because the test will not finish if the logic is broken
+ * we set a timeout because the test will not finish if the logic is broken.
*/
@SuppressWarnings("unchecked")
- @Test(timeout=5000)
+ @Test(timeout = 5000)
public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
@@ -328,8 +333,8 @@ public class FlinkKafkaProducerBaseTest {
private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
private static final long serialVersionUID = 1L;
-
- private final static String DUMMY_TOPIC = "dummy-topic";
+
+ private static final String DUMMY_TOPIC = "dummy-topic";
private transient KafkaProducer<?, ?> mockProducer;
private transient List<Callback> pendingCallbacks;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
index 1882a7e..51e483b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONDeserializationSchemaTest.java
@@ -14,16 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+/**
+ * Tests for the {@link JSONDeserializationSchema}.
+ */
public class JSONDeserializationSchemaTest {
@Test
public void testDeserialize() throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
index 86d3105..565ef00 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java
@@ -14,16 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+/**
+ * Tests for the{@link JSONKeyValueDeserializationSchema}.
+ */
public class JSONKeyValueDeserializationSchemaTest {
@Test
public void testDeserializeWithoutMetadata() throws IOException {
@@ -39,7 +44,6 @@ public class JSONKeyValueDeserializationSchemaTest {
JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0);
-
Assert.assertTrue(deserializedValue.get("metadata") == null);
Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt());
Assert.assertEquals("world", deserializedValue.get("value").get("word").asText());
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
index f03feeb..186e364 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowDeserializationSchemaTest.java
@@ -18,12 +18,13 @@
package org.apache.flink.streaming.connectors.kafka;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.types.Row;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Test;
import java.io.IOException;
@@ -35,6 +36,9 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Tests for the {@link JsonRowDeserializationSchema}.
+ */
public class JsonRowDeserializationSchemaTest {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
index 523eafe..43bde35 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
@@ -14,19 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.types.Row;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.types.Row;
+
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for the {@link JsonRowSerializationSchema}.
+ */
public class JsonRowSerializationSchemaTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index c24640d..0be1d57 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -25,10 +25,10 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;