You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:42 UTC
[12/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-connector-kafka*
[FLINK-6711] Activate strict checkstyle for flink-connector-kafka*
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28e8043b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28e8043b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28e8043b
Branch: refs/heads/master
Commit: 28e8043ba09b47c99439fdb536a4226eccf70c07
Parents: c20b396
Author: zentol <ch...@apache.org>
Authored: Wed May 24 23:55:15 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 00:11:17 2017 +0200
----------------------------------------------------------------------
.../flink-connector-kafka-0.10/pom.xml | 4 +-
.../connectors/kafka/FlinkKafkaConsumer010.java | 19 +-
.../connectors/kafka/FlinkKafkaProducer010.java | 60 ++---
.../kafka/Kafka010AvroTableSource.java | 8 +-
.../kafka/Kafka010JsonTableSource.java | 4 +-
.../connectors/kafka/Kafka010TableSource.java | 4 +-
.../kafka/internal/Kafka010Fetcher.java | 9 +-
.../internal/KafkaConsumerCallBridge010.java | 8 +-
.../src/main/resources/log4j.properties | 1 -
.../kafka/Kafka010AvroTableSourceTest.java | 6 +-
.../connectors/kafka/Kafka010FetcherTest.java | 3 -
.../connectors/kafka/Kafka010ITCase.java | 30 ++-
.../kafka/Kafka010JsonTableSourceTest.java | 6 +-
.../kafka/Kafka010ProducerITCase.java | 5 +-
.../kafka/KafkaTestEnvironmentImpl.java | 41 ++-
.../flink-connector-kafka-0.8/pom.xml | 3 +-
.../connectors/kafka/FlinkKafkaConsumer08.java | 63 +++--
.../connectors/kafka/FlinkKafkaConsumer081.java | 1 +
.../connectors/kafka/FlinkKafkaConsumer082.java | 1 +
.../connectors/kafka/FlinkKafkaProducer.java | 10 +-
.../connectors/kafka/FlinkKafkaProducer08.java | 3 +-
.../kafka/Kafka08AvroTableSource.java | 8 +-
.../connectors/kafka/Kafka08JsonTableSink.java | 9 +-
.../kafka/Kafka08JsonTableSource.java | 4 +-
.../connectors/kafka/Kafka08TableSource.java | 4 +-
.../kafka/internals/ClosableBlockingQueue.java | 97 ++++---
.../kafka/internals/Kafka08Fetcher.java | 70 +++--
.../kafka/internals/KillerWatchDog.java | 2 +-
.../kafka/internals/PartitionInfoFetcher.java | 3 +-
.../internals/PeriodicOffsetCommitter.java | 23 +-
.../kafka/internals/SimpleConsumerThread.java | 96 ++++---
.../kafka/internals/ZookeeperOffsetHandler.java | 24 +-
.../kafka/Kafka08AvroTableSourceTest.java | 6 +-
.../connectors/kafka/Kafka08ITCase.java | 16 +-
.../kafka/Kafka08JsonTableSinkTest.java | 6 +-
.../kafka/Kafka08JsonTableSourceTest.java | 8 +-
.../connectors/kafka/Kafka08ProducerITCase.java | 4 +-
.../connectors/kafka/KafkaConsumer08Test.java | 43 ++--
.../connectors/kafka/KafkaLocalSystemTime.java | 48 ----
.../connectors/kafka/KafkaProducerTest.java | 18 +-
.../kafka/KafkaShortRetention08ITCase.java | 7 +-
.../kafka/KafkaTestEnvironmentImpl.java | 35 +--
.../internals/ClosableBlockingQueueTest.java | 84 +++---
.../flink-connector-kafka-0.9/pom.xml | 4 +-
.../connectors/kafka/FlinkKafkaConsumer09.java | 30 +--
.../connectors/kafka/FlinkKafkaProducer09.java | 3 +-
.../kafka/Kafka09AvroTableSource.java | 8 +-
.../connectors/kafka/Kafka09JsonTableSink.java | 9 +-
.../kafka/Kafka09JsonTableSource.java | 4 +-
.../connectors/kafka/Kafka09TableSource.java | 4 +-
.../connectors/kafka/internal/Handover.java | 32 +--
.../kafka/internal/Kafka09Fetcher.java | 16 +-
.../kafka/internal/KafkaConsumerCallBridge.java | 8 +-
.../kafka/internal/KafkaConsumerThread.java | 35 ++-
.../src/main/resources/log4j.properties | 1 -
.../kafka/Kafka09AvroTableSourceTest.java | 6 +-
.../connectors/kafka/Kafka09FetcherTest.java | 21 +-
.../connectors/kafka/Kafka09ITCase.java | 5 +-
.../kafka/Kafka09JsonTableSinkTest.java | 6 +-
.../kafka/Kafka09JsonTableSourceTest.java | 8 +-
.../connectors/kafka/Kafka09ProducerITCase.java | 4 +-
.../kafka/Kafka09SecuredRunITCase.java | 8 +-
.../connectors/kafka/KafkaProducerTest.java | 20 +-
.../kafka/KafkaTestEnvironmentImpl.java | 41 +--
.../connectors/kafka/internal/HandoverTest.java | 10 +-
.../src/test/resources/log4j-test.properties | 2 +-
.../flink-connector-kafka-base/pom.xml | 5 +-
.../kafka/FlinkKafkaConsumerBase.java | 101 ++++----
.../kafka/FlinkKafkaProducerBase.java | 44 ++--
.../connectors/kafka/KafkaAvroTableSource.java | 16 +-
.../connectors/kafka/KafkaJsonTableSink.java | 7 +-
.../connectors/kafka/KafkaJsonTableSource.java | 4 +-
.../connectors/kafka/KafkaTableSink.java | 11 +-
.../connectors/kafka/KafkaTableSource.java | 3 +-
.../kafka/config/OffsetCommitMode.java | 2 +-
.../connectors/kafka/config/StartupMode.java | 9 +-
.../kafka/internals/AbstractFetcher.java | 110 ++++----
.../kafka/internals/ExceptionProxy.java | 37 +--
.../kafka/internals/KafkaTopicPartition.java | 15 +-
.../internals/KafkaTopicPartitionState.java | 19 +-
.../KafkaTopicPartitionStateSentinel.java | 8 +-
...picPartitionStateWithPeriodicWatermarks.java | 23 +-
...cPartitionStateWithPunctuatedWatermarks.java | 25 +-
.../partitioner/FlinkFixedPartitioner.java | 17 +-
.../FlinkKafkaDelegatePartitioner.java | 1 +
.../AvroRowDeserializationSchema.java | 21 +-
.../AvroRowSerializationSchema.java | 13 +-
.../JSONDeserializationSchema.java | 6 +-
.../JSONKeyValueDeserializationSchema.java | 16 +-
.../JsonRowDeserializationSchema.java | 8 +-
.../JsonRowSerializationSchema.java | 11 +-
.../KeyedDeserializationSchema.java | 4 +-
.../KeyedDeserializationSchemaWrapper.java | 4 +-
.../serialization/KeyedSerializationSchema.java | 10 +-
.../KeyedSerializationSchemaWrapper.java | 3 +-
...eInformationKeyValueSerializationSchema.java | 32 +--
.../kafka/AvroRowDeSerializationSchemaTest.java | 10 +-
.../kafka/FlinkFixedPartitionerTest.java | 109 ++++++++
...inkKafkaConsumerBaseFrom11MigrationTest.java | 5 +-
...inkKafkaConsumerBaseFrom12MigrationTest.java | 31 +--
.../kafka/FlinkKafkaConsumerBaseTest.java | 23 +-
.../kafka/FlinkKafkaProducerBaseTest.java | 37 +--
.../kafka/JSONDeserializationSchemaTest.java | 7 +-
.../JSONKeyValueDeserializationSchemaTest.java | 8 +-
.../kafka/JsonRowDeserializationSchemaTest.java | 10 +-
.../kafka/JsonRowSerializationSchemaTest.java | 9 +-
.../KafkaConsumerPartitionAssignmentTest.java | 4 +-
.../connectors/kafka/KafkaConsumerTestBase.java | 256 +++++++++----------
.../connectors/kafka/KafkaProducerTestBase.java | 15 +-
.../kafka/KafkaShortRetentionTestBase.java | 30 +--
.../kafka/KafkaTableSinkTestBase.java | 15 +-
.../kafka/KafkaTableSourceTestBase.java | 22 +-
.../connectors/kafka/KafkaTestBase.java | 19 +-
.../connectors/kafka/KafkaTestEnvironment.java | 18 +-
.../kafka/TestFlinkFixedPartitioner.java | 104 --------
.../kafka/internals/AbstractFetcherTest.java | 24 +-
.../internals/KafkaTopicPartitionTest.java | 13 +-
.../kafka/testutils/AvroTestUtils.java | 18 +-
.../kafka/testutils/DataGenerators.java | 37 +--
.../kafka/testutils/FailingIdentityMapper.java | 23 +-
.../testutils/FakeStandardProducerConfig.java | 4 +
.../testutils/JobManagerCommunicationUtils.java | 36 +--
.../testutils/PartitionValidatingMapper.java | 8 +-
.../kafka/testutils/ThrottledMapper.java | 6 +-
.../kafka/testutils/Tuple2FlinkPartitioner.java | 4 +-
.../testutils/ValidatingExactlyOnceSink.java | 11 +-
.../testutils/ZooKeeperStringSerializer.java | 3 +-
.../src/test/resources/log4j-test.properties | 1 -
128 files changed, 1361 insertions(+), 1333 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 231b22e..143cb7f 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -130,7 +130,7 @@ under the License.
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
-
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests_${scala.binary.version}</artifactId>
@@ -209,5 +209,5 @@ under the License.
</plugin>
</plugins>
</build>
-
+
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 23fc84e..1bbd1dc 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -29,21 +29,21 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;
+
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Collections;
-import java.util.Map;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
-
/**
* The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
* Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, each of which will pull
- * data from one or more Kafka partitions.
- *
+ * data from one or more Kafka partitions.
+ *
* <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
- * during a failure, and that the computation processes elements "exactly once".
+ * during a failure, and that the computation processes elements "exactly once".
* (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
*
* <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
@@ -62,11 +62,10 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
private static final long serialVersionUID = 2324564345203409112L;
-
// ------------------------------------------------------------------------
/**
- * Creates a new Kafka streaming source consumer for Kafka 0.10.x
+ * Creates a new Kafka streaming source consumer for Kafka 0.10.x.
*
* @param topic
* The name of the topic that should be consumed.
@@ -82,7 +81,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
/**
* Creates a new Kafka streaming source consumer for Kafka 0.10.x
*
- * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+ * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param topic
@@ -99,7 +98,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
/**
* Creates a new Kafka streaming source consumer for Kafka 0.10.x
*
- * This constructor allows passing multiple topics to the consumer.
+ * <p>This constructor allows passing multiple topics to the consumer.
*
* @param topics
* The Kafka topics to read from.
@@ -115,7 +114,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
/**
* Creates a new Kafka streaming source consumer for Kafka 0.10.x
*
- * This constructor allows passing multiple topics and a key/value deserialization schema.
+ * <p>This constructor allows passing multiple topics and a key/value deserialization schema.
*
* @param topics
* The Kafka topics to read from.
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 711fe07..805bc4e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -17,8 +17,6 @@
package org.apache.flink.streaming.connectors.kafka;
-import java.util.Properties;
-
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -37,34 +35,35 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
import org.apache.kafka.clients.producer.ProducerRecord;
+import java.util.Properties;
+
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic;
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
-
/**
* Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x
*
- * Implementation note: This producer is a hybrid between a regular regular sink function (a)
+ * <p>Implementation note: This producer is a hybrid between a regular regular sink function (a)
* and a custom operator (b).
*
- * For (a), the class implements the SinkFunction and RichFunction interfaces.
+ * <p>For (a), the class implements the SinkFunction and RichFunction interfaces.
* For (b), it extends the StreamTask class.
*
- * Details about approach (a):
- *
+ * <p>Details about approach (a):
* Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the
* DataStream.addSink() method.
* Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record
* the Kafka 0.10 producer has a second invocation option, approach (b).
*
- * Details about approach (b):
+ * <p>Details about approach (b):
* Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the
* FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
* can access the internal record timestamp of the record and write it to Kafka.
*
- * All methods and constructors in this class are marked with the approach they are needed for.
+ * <p>All methods and constructors in this class are marked with the approach they are needed for.
*/
public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction {
@@ -79,7 +78,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
- * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+ * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
*
* @param inStream The stream to write to Kafka
* @param topicId ID of the Kafka topic.
@@ -93,12 +92,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
}
-
/**
* Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
* the topic.
*
- * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+ * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
*
* @param inStream The stream to write to Kafka
* @param topicId ID of the Kafka topic.
@@ -116,7 +114,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
- * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+ * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
*
* @param inStream The stream to write to Kafka
* @param topicId The name of the target topic
@@ -212,11 +210,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
}
-
+
/**
- * Create Kafka producer
+ * Create Kafka producer.
*
- * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+ * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
*/
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
@@ -230,7 +228,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
- * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+ * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
*
* @param inStream The stream to write to Kafka
* @param topicId The name of the target topic
@@ -275,9 +273,9 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
}
/**
- * Create Kafka producer
+ * Create Kafka producer.
*
- * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+ * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
*
* @deprecated This is a deprecated constructor that does not correctly handle partitioning when
* producing to multiple topics. Use
@@ -306,13 +304,13 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
}
Long timestamp = null;
- if(this.writeTimestampToKafka) {
+ if (this.writeTimestampToKafka) {
timestamp = elementTimestamp;
}
ProducerRecord<byte[], byte[]> record;
int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic);
- if(null == partitions) {
+ if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, internalProducer.producer);
internalProducer.topicPartitionsMap.put(targetTopic, partitions);
}
@@ -329,10 +327,8 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
internalProducer.producer.send(record, internalProducer.callback);
}
-
// ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ----
-
// ---- Configuration setters
/**
@@ -341,7 +337,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* exceptions will be eventually thrown and cause the streaming program to
* fail (and enter recovery).
*
- * Method is only accessible for approach (a) (see above)
+ * <p>Method is only accessible for approach (a) (see above)
*
* @param logFailuresOnly The flag to indicate logging-only on exceptions.
*/
@@ -355,7 +351,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* to be acknowledged by the Kafka producer on a checkpoint.
* This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
*
- * Method is only accessible for approach (a) (see above)
+ * <p>Method is only accessible for approach (a) (see above)
*
* @param flush Flag indicating the flushing mode (true = flush on checkpoint)
*/
@@ -365,8 +361,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
}
/**
- * This method is used for approach (a) (see above)
- *
+ * This method is used for approach (a) (see above).
*/
@Override
public void open(Configuration parameters) throws Exception {
@@ -375,7 +370,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
}
/**
- * This method is used for approach (a) (see above)
+ * This method is used for approach (a) (see above).
*/
@Override
public IterationRuntimeContext getIterationRuntimeContext() {
@@ -384,7 +379,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
}
/**
- * This method is used for approach (a) (see above)
+ * This method is used for approach (a) (see above).
*/
@Override
public void setRuntimeContext(RuntimeContext t) {
@@ -395,7 +390,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
/**
* Invoke method for using the Sink as DataStream.addSink() sink.
*
- * This method is used for approach (a) (see above)
+ * <p>This method is used for approach (a) (see above)
*
* @param value The input record.
*/
@@ -404,14 +399,12 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
invokeInternal(value, Long.MAX_VALUE);
}
-
// ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ----
-
/**
* Process method for using the sink with timestamp support.
*
- * This method is used for approach (b) (see above)
+ * <p>This method is used for approach (b) (see above)
*/
@Override
public void processElement(StreamRecord<T> element) throws Exception {
@@ -467,5 +460,4 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
}
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
index 1b2abcc..9921428 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
@@ -18,13 +18,15 @@
package org.apache.flink.streaming.connectors.kafka;
-import java.util.Properties;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.util.Properties;
+
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index 78ef28e..f400f6b 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -19,9 +19,9 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
import java.util.Properties;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index 03e9125..a6de13a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -19,9 +19,9 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
import java.util.Properties;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 586d841..eb4dfee 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -36,10 +36,10 @@ import java.util.Properties;
/**
* A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API.
- *
+ *
* <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It only additionally
* takes the KafkaRecord-attached timestamp and attaches it to the Flink records.
- *
+ *
* @param <T> The type of elements produced by the fetcher.
*/
public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
@@ -57,8 +57,7 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
- boolean useMetrics) throws Exception
- {
+ boolean useMetrics) throws Exception {
super(
sourceContext,
assignedPartitionsWithInitialOffsets,
@@ -88,7 +87,7 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
/**
* This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10,
- * changing binary signatures
+ * changing binary signatures.
*/
@Override
protected KafkaConsumerCallBridge010 createCallBridge() {
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
index 0fda9a6..b621140 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
@@ -26,11 +26,11 @@ import java.util.List;
/**
* The ConsumerCallBridge simply calls the {@link KafkaConsumer#assign(java.util.Collection)} method.
- *
- * This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
+ *
+ * <p>This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
* changing {@code assign(List)} to {@code assign(Collection)}.
- *
- * Because of that, we need two versions whose compiled code goes against different method signatures.
+ *
+ * <p>Because of that, we need two versions whose compiled code goes against different method signatures.
*/
public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
index 6bdfb48..6eef174 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/log4j.properties
@@ -26,4 +26,3 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
-
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
index ed93725..025fefc 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java
@@ -18,12 +18,16 @@
package org.apache.flink.streaming.connectors.kafka;
-import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.types.Row;
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka010AvroTableSource}.
+ */
public class Kafka010AvroTableSourceTest extends KafkaTableSourceTestBase {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 2d0551d..aedd4ba 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -39,10 +39,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
-
import org.junit.Test;
import org.junit.runner.RunWith;
-
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -63,7 +61,6 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.powermock.api.mockito.PowerMockito.doAnswer;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index add623e..22193b7 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -38,13 +38,17 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
import org.junit.Test;
import javax.annotation.Nullable;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
-
+/**
+ * IT cases for Kafka 0.10 .
+ */
public class Kafka010ITCase extends KafkaConsumerTestBase {
// ------------------------------------------------------------------------
@@ -83,7 +87,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
runFailOnDeployTest();
}
-
// --- source to partition mappings and exactly once ---
@Test(timeout = 60000)
@@ -170,7 +173,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
}
/**
- * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka
+ * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka.
*/
@Test(timeout = 60000)
public void testTimestamps() throws Exception {
@@ -193,9 +196,9 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
@Override
public void run(SourceContext<Long> ctx) throws Exception {
long i = 0;
- while(running) {
- ctx.collectWithTimestamp(i, i*2);
- if(i++ == 1000L) {
+ while (running) {
+ ctx.collectWithTimestamp(i, i * 2);
+ if (i++ == 1000L) {
running = false;
}
}
@@ -213,7 +216,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
@Override
public int partition(Long next, byte[] key, byte[] value, String targetTopic, int[] partitions) {
- return (int)(next % 3);
+ return (int) (next % 3);
}
});
prod.setParallelism(3);
@@ -235,7 +238,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
- if(lastElement % 10 == 0) {
+ if (lastElement % 10 == 0) {
return new Watermark(lastElement);
}
return null;
@@ -278,7 +281,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
@Override
public void processElement(StreamRecord<Long> element) throws Exception {
elCount++;
- if(element.getValue() * 2 != element.getTimestamp()) {
+ if (element.getValue() * 2 != element.getTimestamp()) {
throw new RuntimeException("Invalid timestamp: " + element);
}
}
@@ -287,13 +290,13 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
public void processWatermark(Watermark mark) throws Exception {
wmCount++;
- if(lastWM <= mark.getTimestamp()) {
+ if (lastWM <= mark.getTimestamp()) {
lastWM = mark.getTimestamp();
} else {
throw new RuntimeException("Received watermark higher than the last one");
}
- if( mark.getTimestamp() % 10 != 0 && mark.getTimestamp() != Long.MAX_VALUE ) {
+ if (mark.getTimestamp() % 10 != 0 && mark.getTimestamp() != Long.MAX_VALUE) {
throw new RuntimeException("Invalid watermark: " + mark.getTimestamp());
}
}
@@ -301,11 +304,11 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
@Override
public void close() throws Exception {
super.close();
- if(elCount != 1000L) {
+ if (elCount != 1000L) {
throw new RuntimeException("Wrong final element count " + elCount);
}
- if(wmCount <= 2) {
+ if (wmCount <= 2) {
throw new RuntimeException("Almost no watermarks have been sent " + wmCount);
}
}
@@ -322,6 +325,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
this.ti = TypeInfoParser.parse("Long");
this.ser = ti.createSerializer(new ExecutionConfig());
}
+
@Override
public TypeInformation<Long> getProducedType() {
return ti;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
index 55e8b9c..092f5ea 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java
@@ -18,12 +18,16 @@
package org.apache.flink.streaming.connectors.kafka;
-import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka010JsonTableSource}.
+ */
public class Kafka010JsonTableSourceTest extends KafkaTableSourceTestBase {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
index 42b9682..64a5a3f 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
@@ -18,10 +18,11 @@
package org.apache.flink.streaming.connectors.kafka;
-
import org.junit.Test;
-
+/**
+ * IT cases for the {@link FlinkKafkaProducer010}.
+ */
@SuppressWarnings("serial")
public class Kafka010ProducerITCase extends KafkaProducerTestBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c88c858..cb30fbf 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -15,9 +15,17 @@
* limitations under the License.
*/
-
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+
import kafka.admin.AdminUtils;
import kafka.common.KafkaException;
import kafka.server.KafkaConfig;
@@ -27,14 +35,6 @@ import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -57,7 +57,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
- * An implementation of the KafkaServerProvider for Kafka 0.10
+ * An implementation of the KafkaServerProvider for Kafka 0.10 .
*/
public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
@@ -87,7 +87,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
@Override
public Properties getSecureProperties() {
Properties prop = new Properties();
- if(secureMode) {
+ if (secureMode) {
prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.kerberos.service.name", "kafka");
@@ -95,7 +95,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
//add special timeout for Travis
prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
- prop.setProperty("metadata.fetch.timeout.ms","120000");
+ prop.setProperty("metadata.fetch.timeout.ms", "120000");
}
return prop;
}
@@ -122,7 +122,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
return new StreamSink<>(prod);
}
-
@Override
public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
@@ -176,7 +175,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
@Override
public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) {
//increase the timeout since in Travis ZK connection takes long time for secure connection.
- if(secureMode) {
+ if (secureMode) {
//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
numKafkaServers = 1;
zkTimeout = zkTimeout * 15;
@@ -203,7 +202,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
brokers = null;
try {
- zookeeper = new TestingServer(- 1, tmpZkDir);
+ zookeeper = new TestingServer(-1, tmpZkDir);
zookeeperConnectionString = zookeeper.getConnectString();
LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
@@ -213,7 +212,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
for (int i = 0; i < numKafkaServers; i++) {
brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
- if(secureMode) {
+ if (secureMode) {
brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
} else {
brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
@@ -299,7 +298,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
final long deadline = System.nanoTime() + 30_000_000_000L;
do {
try {
- if(secureMode) {
+ if (secureMode) {
//increase wait time since in Travis ZK timeout occurs frequently
int wait = zkTimeout / 100;
LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
@@ -315,7 +314,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
// create a new ZK utils connection
ZkUtils checkZKConn = getZkUtils();
- if(AdminUtils.topicExists(checkZKConn, topic)) {
+ if (AdminUtils.topicExists(checkZKConn, topic)) {
checkZKConn.close();
return;
}
@@ -343,7 +342,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
/**
- * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+ * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed).
*/
protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
Properties kafkaProperties = new Properties();
@@ -359,7 +358,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
// for CI stability, increase zookeeper session timeout
kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
- if(additionalServerProperties != null) {
+ if (additionalServerProperties != null) {
kafkaProperties.putAll(additionalServerProperties);
}
@@ -370,7 +369,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
kafkaProperties.put("port", Integer.toString(kafkaPort));
//to support secure kafka cluster
- if(secureMode) {
+ if (secureMode) {
LOG.info("Adding Kafka secure configurations");
kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml
index 5e2ed2d..b6b0336 100644
--- a/flink-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -175,7 +175,6 @@ under the License.
</dependencies>
-
<build>
<plugins>
<plugin>
@@ -215,5 +214,5 @@ under the License.
</plugin>
</plugins>
</build>
-
+
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 858a790..6c7b94d 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -17,13 +17,6 @@
package org.apache.flink.streaming.connectors.kafka;
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -39,6 +32,12 @@ import org.apache.flink.util.NetUtils;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;
+import kafka.cluster.Broker;
+import kafka.common.ErrorMapping;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Node;
@@ -46,25 +45,25 @@ import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
-import java.util.Collections;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
-import static org.apache.flink.util.PropertiesUtil.getInt;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.PropertiesUtil.getInt;
/**
* The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
* Apache Kafka 0.8.x. The consumer can run in multiple parallel instances, each of which will pull
- * data from one or more Kafka partitions.
- *
+ * data from one or more Kafka partitions.
+ *
* <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
- * during a failure, and that the computation processes elements "exactly once".
+ * during a failure, and that the computation processes elements "exactly once".
* (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
- *
+ *
* <p>Flink's Kafka Consumer is designed to be compatible with Kafka's High-Level Consumer API (0.8.x).
* Most of Kafka's configuration variables can be used with this consumer as well:
* <ul>
@@ -74,11 +73,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* <li>auto.offset.reset with the values "largest", "smallest"</li>
* <li>fetch.wait.max.ms</li>
* </ul>
- * </li>
- * </ul>
- *
+ *
* <h1>Offset handling</h1>
- *
+ *
* <p>Offsets whose records have been read and are checkpointed will be committed back to ZooKeeper
* by the offset handler. In addition, the offset handler finds the point where the source initially
* starts reading from the stream, when the streaming job is started.</p>
@@ -93,7 +90,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* <p>When using a Kafka topic to send data between Flink jobs, we recommend using the
* {@see TypeInformationSerializationSchema} and {@see TypeInformationKeyValueSerializationSchema}.</p>
- *
+ *
* <p><b>NOTE:</b> The implementation currently accesses partition metadata when the consumer
* is constructed. That means that the client that submits the program needs to be able to
* reach the Kafka brokers or ZooKeeper.</p>
@@ -102,7 +99,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
private static final long serialVersionUID = -6272159445203409112L;
- /** Configuration key for the number of retries for getting the partition info */
+ /** Configuration key for the number of retries for getting the partition info. */
public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
/** Default number of retries for getting the partition info. One retry means going through the full list of brokers */
@@ -110,13 +107,13 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
// ------------------------------------------------------------------------
- /** The properties to parametrize the Kafka consumer and ZooKeeper client */
+ /** The properties to parametrize the Kafka consumer and ZooKeeper client. */
private final Properties kafkaProperties;
// ------------------------------------------------------------------------
/**
- * Creates a new Kafka streaming source consumer for Kafka 0.8.x
+ * Creates a new Kafka streaming source consumer for Kafka 0.8.x.
*
* @param topic
* The name of the topic that should be consumed.
@@ -132,7 +129,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
/**
* Creates a new Kafka streaming source consumer for Kafka 0.8.x
*
- * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+ * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param topic
@@ -149,7 +146,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
/**
* Creates a new Kafka streaming source consumer for Kafka 0.8.x
*
- * This constructor allows passing multiple topics to the consumer.
+ * <p>This constructor allows passing multiple topics to the consumer.
*
* @param topics
* The Kafka topics to read from.
@@ -165,8 +162,8 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
/**
* Creates a new Kafka streaming source consumer for Kafka 0.8.x
*
- * This constructor allows passing multiple topics and a key/value deserialization schema.
- *
+ * <p>This constructor allows passing multiple topics and a key/value deserialization schema.
+ *
* @param topics
* The Kafka topics to read from.
* @param deserializer
@@ -245,14 +242,14 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
/**
* Send request to Kafka to get partitions for topic.
- *
+ *
* @param topics The name of the topics.
- * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic.
+ * @param properties The properties for the Kafka Consumer that is used to query the partitions for the topic.
*/
public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) {
String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
-
+
checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
String[] seedBrokers = seedBrokersConfString.split(",");
List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
@@ -328,7 +325,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
}
/**
- * Turn a broker instance into a node instance
+ * Turn a broker instance into a node instance.
* @param broker broker instance
* @return Node representing the given broker
*/
@@ -337,7 +334,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
}
/**
- * Validate the ZK configuration, checking for required parameters
+ * Validate the ZK configuration, checking for required parameters.
* @param props Properties to check
*/
protected static void validateZooKeeperConfig(Properties props) {
@@ -348,7 +345,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
throw new IllegalArgumentException("Required property '" + ConsumerConfig.GROUP_ID_CONFIG
+ "' has not been set in the properties");
}
-
+
try {
//noinspection ResultOfMethodCallIgnored
Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
@@ -356,7 +353,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
catch (NumberFormatException e) {
throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
}
-
+
try {
//noinspection ResultOfMethodCallIgnored
Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
@@ -369,7 +366,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
/**
* Validate that at least one seed broker is valid in case of a
* ClosedChannelException.
- *
+ *
* @param seedBrokers
* array containing the seed brokers e.g. ["host1:port1",
* "host2:port2"]
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
index 4e4050f..4102bf8 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
index aeefcc8..7ba5103 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 98dac3e..434286e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -22,8 +22,8 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import java.util.Properties;
+import java.util.Properties;
/**
* THIS CLASS IS DEPRECATED. Use FlinkKafkaProducer08 instead.
@@ -38,7 +38,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> {
*/
@Deprecated
public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
- super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>)null);
+ super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>) null);
}
/**
@@ -46,7 +46,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> {
*/
@Deprecated
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
- super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, (FlinkKafkaPartitioner<IN>)null);
+ super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, (FlinkKafkaPartitioner<IN>) null);
}
/**
@@ -63,7 +63,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> {
*/
@Deprecated
public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
- super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>)null);
+ super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>) null);
}
/**
@@ -71,7 +71,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> {
*/
@Deprecated
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
- super(topicId, serializationSchema, producerConfig, (FlinkKafkaPartitioner<IN>)null);
+ super(topicId, serializationSchema, producerConfig, (FlinkKafkaPartitioner<IN>) null);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 08dcb2f..a14768b 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -27,11 +27,10 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
import java.util.Properties;
-
/**
* Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.8.
*
- * Please note that this producer does not have any reliability guarantees.
+ * <p>Please note that this producer does not have any reliability guarantees.
*
* @param <IN> Type of the messages to write into Kafka.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
index 1a68c05..a1bea78 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
@@ -18,13 +18,15 @@
package org.apache.flink.streaming.connectors.kafka;
-import java.util.Properties;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.util.Properties;
+
/**
* Kafka {@link StreamTableSource} for Kafka 0.8.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 80bd180..79406d8 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
import java.util.Properties;
@@ -29,9 +30,9 @@ import java.util.Properties;
* Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
*/
public class Kafka08JsonTableSink extends KafkaJsonTableSink {
-
+
/**
- * Creates {@link KafkaTableSink} for Kafka 0.8
+ * Creates {@link KafkaTableSink} for Kafka 0.8.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
@@ -42,7 +43,7 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
}
/**
- * Creates {@link KafkaTableSink} for Kafka 0.8
+ * Creates {@link KafkaTableSink} for Kafka 0.8.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
index 1555a3b..05a2c71 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -19,9 +19,9 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
import java.util.Properties;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index e1e481c..9536306 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -19,9 +19,9 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
import java.util.Properties;
http://git-wip-us.apache.org/repos/asf/flink/blob/28e8043b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
index e31dcac..da61dd0 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
@@ -38,27 +38,27 @@ import static java.util.Objects.requireNonNull;
* are available and mark themselves as shut down.</li>
* <li>The queue allows to poll batches of elements in one polling call.</li>
* </ol>
- *
- * The queue has no capacity restriction and is safe for multiple producers and consumers.
- *
+ *
+ * <p>The queue has no capacity restriction and is safe for multiple producers and consumers.
+ *
* <p>Note: Null elements are prohibited.
- *
+ *
* @param <E> The type of elements in the queue.
*/
public class ClosableBlockingQueue<E> {
- /** The lock used to make queue accesses and open checks atomic */
+ /** The lock used to make queue accesses and open checks atomic. */
private final ReentrantLock lock;
-
- /** The condition on which blocking get-calls wait if the queue is empty */
+
+ /** The condition on which blocking get-calls wait if the queue is empty. */
private final Condition nonEmpty;
-
- /** The deque of elements */
+
+ /** The deque of elements. */
private final ArrayDeque<E> elements;
-
- /** Flag marking the status of the queue */
+
+ /** Flag marking the status of the queue. */
private volatile boolean open;
-
+
// ------------------------------------------------------------------------
/**
@@ -72,22 +72,21 @@ public class ClosableBlockingQueue<E> {
* Creates a new empty queue, reserving space for at least the specified number
* of elements. The queu can still grow, of more elements are added than the
* reserved space.
- *
+ *
* @param initialSize The number of elements to reserve space for.
*/
public ClosableBlockingQueue(int initialSize) {
this.lock = new ReentrantLock(true);
this.nonEmpty = this.lock.newCondition();
-
+
this.elements = new ArrayDeque<>(initialSize);
this.open = true;
-
-
+
}
/**
* Creates a new queue that contains the given elements.
- *
+ *
* @param initialElements The elements to initially add to the queue.
*/
public ClosableBlockingQueue(Collection<? extends E> initialElements) {
@@ -127,7 +126,7 @@ public class ClosableBlockingQueue<E> {
public boolean isOpen() {
return open;
}
-
+
/**
* Tries to close the queue. Closing the queue only succeeds when no elements are
* in the queue when this method is called. Checking whether the queue is empty, and
@@ -155,25 +154,25 @@ public class ClosableBlockingQueue<E> {
lock.unlock();
}
}
-
+
// ------------------------------------------------------------------------
// Adding / Removing elements
// ------------------------------------------------------------------------
-
+
/**
* Tries to add an element to the queue, if the queue is still open. Checking whether the queue
* is open and adding the element is one atomic operation.
- *
+ *
* <p>Unlike the {@link #add(Object)} method, this method never throws an exception,
* but only indicates via the return code if the element was added or the
* queue was closed.
- *
+ *
* @param element The element to add.
* @return True, if the element was added, false if the queue was closes.
*/
public boolean addIfOpen(E element) {
requireNonNull(element);
-
+
lock.lock();
try {
if (open) {
@@ -191,7 +190,7 @@ public class ClosableBlockingQueue<E> {
/**
* Adds the element to the queue, or fails with an exception, if the queue is closed.
* Checking whether the queue is open and adding the element is one atomic operation.
- *
+ *
* @param element The element to add.
* @throws IllegalStateException Thrown, if the queue is closed.
*/
@@ -215,13 +214,13 @@ public class ClosableBlockingQueue<E> {
/**
* Returns the queue's next element without removing it, if the queue is non-empty.
- * Otherwise, returns null.
+ * Otherwise, returns null.
*
* <p>The method throws an {@code IllegalStateException} if the queue is closed.
* Checking whether the queue is open and getting the next element is one atomic operation.
- *
+ *
* <p>This method never blocks.
- *
+ *
* @return The queue's next element, or null, if the queue is empty.
* @throws IllegalStateException Thrown, if the queue is closed.
*/
@@ -244,7 +243,7 @@ public class ClosableBlockingQueue<E> {
/**
* Returns the queue's next element and removes it, the queue is non-empty.
- * Otherwise, this method returns null.
+ * Otherwise, this method returns null.
*
* <p>The method throws an {@code IllegalStateException} if the queue is closed.
* Checking whether the queue is open and removing the next element is one atomic operation.
@@ -273,7 +272,7 @@ public class ClosableBlockingQueue<E> {
/**
* Returns all of the queue's current elements in a list, if the queue is non-empty.
- * Otherwise, this method returns null.
+ * Otherwise, this method returns null.
*
* <p>The method throws an {@code IllegalStateException} if the queue is closed.
* Checking whether the queue is open and removing the elements is one atomic operation.
@@ -305,12 +304,12 @@ public class ClosableBlockingQueue<E> {
/**
* Returns the next element in the queue. If the queue is empty, this method
* waits until at least one element is added.
- *
+ *
* <p>The method throws an {@code IllegalStateException} if the queue is closed.
* Checking whether the queue is open and removing the next element is one atomic operation.
- *
+ *
* @return The next element in the queue, never null.
- *
+ *
* @throws IllegalStateException Thrown, if the queue is closed.
* @throws InterruptedException Throw, if the thread is interrupted while waiting for an
* element to be added.
@@ -321,7 +320,7 @@ public class ClosableBlockingQueue<E> {
while (open && elements.isEmpty()) {
nonEmpty.await();
}
-
+
if (open) {
return elements.removeFirst();
} else {
@@ -336,13 +335,13 @@ public class ClosableBlockingQueue<E> {
* Returns the next element in the queue. If the queue is empty, this method
* waits at most a certain time until an element becomes available. If no element
* is available after that time, the method returns null.
- *
+ *
* <p>The method throws an {@code IllegalStateException} if the queue is closed.
* Checking whether the queue is open and removing the next element is one atomic operation.
- *
+ *
* @param timeoutMillis The number of milliseconds to block, at most.
* @return The next element in the queue, or null, if the timeout expires before an element is available.
- *
+ *
* @throws IllegalStateException Thrown, if the queue is closed.
* @throws InterruptedException Throw, if the thread is interrupted while waiting for an
* element to be added.
@@ -354,16 +353,16 @@ public class ClosableBlockingQueue<E> {
} else if (timeoutMillis < 0L) {
throw new IllegalArgumentException("invalid timeout");
}
-
+
final long deadline = System.nanoTime() + timeoutMillis * 1_000_000L;
-
+
lock.lock();
try {
- while (open && elements.isEmpty() && timeoutMillis > 0) {
+ while (open && elements.isEmpty() && timeoutMillis > 0) {
nonEmpty.await(timeoutMillis, TimeUnit.MILLISECONDS);
timeoutMillis = (deadline - System.nanoTime()) / 1_000_000L;
}
-
+
if (!open) {
throw new IllegalStateException("queue is closed");
}
@@ -383,12 +382,12 @@ public class ClosableBlockingQueue<E> {
* at least one element is added.
*
* <p>This method always returns a list with at least one element.
- *
+ *
* <p>The method throws an {@code IllegalStateException} if the queue is closed.
* Checking whether the queue is open and removing the next element is one atomic operation.
- *
+ *
* @return A list with all elements in the queue, always at least one element.
- *
+ *
* @throws IllegalStateException Thrown, if the queue is closed.
* @throws InterruptedException Throw, if the thread is interrupted while waiting for an
* element to be added.
@@ -415,13 +414,13 @@ public class ClosableBlockingQueue<E> {
* Gets all the elements found in the list, or blocks until at least one element
* was added. This method is similar as {@link #getBatchBlocking()}, but takes
* a number of milliseconds that the method will maximally wait before returning.
- *
+ *
* <p>This method never returns null, but an empty list, if the queue is empty when
* the method is called and the request times out before an element was added.
- *
+ *
* <p>The method throws an {@code IllegalStateException} if the queue is closed.
* Checking whether the queue is open and removing the next element is one atomic operation.
- *
+ *
* @param timeoutMillis The number of milliseconds to wait, at most.
* @return A list with all elements in the queue, possible an empty list.
*
@@ -461,11 +460,11 @@ public class ClosableBlockingQueue<E> {
lock.unlock();
}
}
-
+
// ------------------------------------------------------------------------
// Standard Utilities
// ------------------------------------------------------------------------
-
+
@Override
public int hashCode() {
int hashCode = 17;
@@ -482,7 +481,7 @@ public class ClosableBlockingQueue<E> {
} else if (obj != null && obj.getClass() == ClosableBlockingQueue.class) {
@SuppressWarnings("unchecked")
ClosableBlockingQueue<E> that = (ClosableBlockingQueue<E>) obj;
-
+
if (this.elements.size() == that.elements.size()) {
Iterator<E> thisElements = this.elements.iterator();
for (E thatNext : that.elements) {