You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/08 11:45:45 UTC
[1/2] flink git commit: [FLINK-2372] Add new FlinkKafkaProducer bases
on the new producer API
Repository: flink
Updated Branches:
refs/heads/release-0.10.0-milestone-1 41a1fdd54 -> 563ee9ad8
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index c4438f0..97484b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -38,12 +38,14 @@ import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSeri
import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.TestLogger;
import org.apache.kafka.common.PartitionInfo;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.IOException;
@@ -51,6 +53,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -69,7 +72,7 @@ import static org.junit.Assert.fail;
* as per commit <i>bc6b2b2d5f6424d5f377aa6c0871e82a956462ef</i></p>
*/
@SuppressWarnings("serial")
-public abstract class KafkaTestBase {
+public abstract class KafkaTestBase extends TestLogger {
protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBase.class);
@@ -91,9 +94,13 @@ public abstract class KafkaTestBase {
protected static ForkableFlinkMiniCluster flink;
protected static int flinkPort;
-
-
-
+
+ protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+
+ protected static List<File> tmpKafkaDirs;
+
+ protected static String kafkaHost = "localhost";
+
// ------------------------------------------------------------------------
// Setup and teardown of the mini clusters
// ------------------------------------------------------------------------
@@ -114,14 +121,14 @@ public abstract class KafkaTestBase {
tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
- List<File> tmpKafkaDirs = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
+ tmpKafkaDirs = new ArrayList<>(NUMBER_OF_KAFKA_SERVERS);
for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
File tmpDir = new File(tmpKafkaParent, "server-" + i);
assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
tmpKafkaDirs.add(tmpDir);
}
- String kafkaHost = "localhost";
+
int zkPort = NetUtils.getAvailablePort();
zookeeperConnectionString = "localhost:" + zkPort;
@@ -157,6 +164,7 @@ public abstract class KafkaTestBase {
standardProps.setProperty("group.id", "flink-tests");
standardProps.setProperty("auto.commit.enable", "false");
standardProps.setProperty("zookeeper.session.timeout.ms", "12000"); // 6 seconds is default. Seems to be too small for travis.
+ standardProps.setProperty("zookeeper.connection.timeout.ms", "20000");
standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
@@ -185,7 +193,9 @@ public abstract class KafkaTestBase {
LOG.info("-------------------------------------------------------------------------");
flinkPort = -1;
- flink.shutdown();
+ if (flink != null) {
+ flink.shutdown();
+ }
for (KafkaServer broker : brokers) {
if (broker != null) {
@@ -231,7 +241,7 @@ public abstract class KafkaTestBase {
/**
* Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
*/
- private static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
+ protected static KafkaServer getKafkaServer(int brokerId, File tmpFolder,
String kafkaHost,
String zookeeperConnectionString) throws Exception {
Properties kafkaProperties = new Properties();
@@ -245,7 +255,11 @@ public abstract class KafkaTestBase {
kafkaProperties.put("log.dir", tmpFolder.toString());
kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
kafkaProperties.put("message.max.bytes", "" + (50 * 1024 * 1024));
- kafkaProperties.put("replica.fetch.max.bytes", "" + (50 * 1024 * 1024));
+ kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
+
+ // for CI stability, increase zookeeper session timeout
+ kafkaProperties.put("zookeeper.session.timeout.ms", "20000");
+
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
new file mode 100644
index 0000000..75fdd46
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+
+public class TestFixedPartitioner {
+
+
+ /**
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 --------------/
+ * 3 -------------/
+ * 4 ------------/
+ * </pre>
+ */
+ @Test
+ public void testMoreFlinkThanBrokers() {
+ FixedPartitioner part = new FixedPartitioner();
+
+ int[] partitions = new int[]{0};
+
+ part.open(0, 4, partitions);
+ Assert.assertEquals(0, part.partition("abc1", partitions.length));
+
+ part.open(1, 4, partitions);
+ Assert.assertEquals(0, part.partition("abc2", partitions.length));
+
+ part.open(2, 4, partitions);
+ Assert.assertEquals(0, part.partition("abc3", partitions.length));
+ Assert.assertEquals(0, part.partition("abc3", partitions.length)); // check if it is changing ;)
+
+ part.open(3, 4, partitions);
+ Assert.assertEquals(0, part.partition("abc4", partitions.length));
+ }
+
+ /**
+ *
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 ----------------> 2
+ * 3
+ * 4
+ * 5
+ *
+ * </pre>
+ */
+ @Test
+ public void testFewerPartitions() {
+ FixedPartitioner part = new FixedPartitioner();
+
+ int[] partitions = new int[]{0, 1, 2, 3, 4};
+ part.open(0, 2, partitions);
+ Assert.assertEquals(0, part.partition("abc1", partitions.length));
+ Assert.assertEquals(0, part.partition("abc1", partitions.length));
+
+ part.open(1, 2, partitions);
+ Assert.assertEquals(1, part.partition("abc1", partitions.length));
+ Assert.assertEquals(1, part.partition("abc1", partitions.length));
+ }
+
+ /*
+ * Flink Sinks: Kafka Partitions
+ * 1 ------------>---> 1
+ * 2 -----------/----> 2
+ * 3 ----------/
+ */
+ @Test
+ public void testMixedCase() {
+ FixedPartitioner part = new FixedPartitioner();
+ int[] partitions = new int[]{0,1};
+
+ part.open(0, 3, partitions);
+ Assert.assertEquals(0, part.partition("abc1", partitions.length));
+
+ part.open(1, 3, partitions);
+ Assert.assertEquals(1, part.partition("abc1", partitions.length));
+
+ part.open(2, 3, partitions);
+ Assert.assertEquals(0, part.partition("abc1", partitions.length));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
index 27ad2e8..1437a2f 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
@@ -23,7 +23,6 @@ import kafka.admin.AdminUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
-import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
import org.junit.Test;
import java.util.Properties;
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index a887e4f..32377ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -26,8 +26,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.kafka.KafkaSink;
-import org.apache.flink.streaming.connectors.kafka.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
@@ -69,8 +69,9 @@ public class DataGenerators {
}
});
- stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnection, topic,
- new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
+ stream.addSink(new FlinkKafkaProducer<>(topic,
+ new TypeInformationSerializationSchema<>(resultType, env.getConfig()),
+ FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
new Tuple2Partitioner(numPartitions)
));
@@ -131,9 +132,10 @@ public class DataGenerators {
stream
.rebalance()
- .addSink(new KafkaSink<>(brokerConnection, topic,
+ .addSink(new FlinkKafkaProducer<>(topic,
new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
- new SerializableKafkaPartitioner() {
+ FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnection),
+ new KafkaPartitioner() {
@Override
public int partition(Object key, int numPartitions) {
return ((Integer) key) % numPartitions;
@@ -164,9 +166,10 @@ public class DataGenerators {
@Override
public void run() {
// we manually feed data into the Kafka sink
- KafkaSink<String> producer = null;
+ FlinkKafkaProducer<String> producer = null;
try {
- producer = new KafkaSink<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
+ producer = new FlinkKafkaProducer<>(kafkaConnectionString, topic, new JavaDefaultStringSchema());
+ producer.setRuntimeContext(new MockRuntimeContext(1,0));
producer.open(new Configuration());
final StringBuilder bld = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
index 6ffaac4..b762e21 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
@@ -19,13 +19,15 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.connectors.kafka.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+
+import java.io.Serializable;
/**
* Special partitioner that uses the first field of a 2-tuple as the partition,
* and that expects a specific number of partitions.
*/
-public class Tuple2Partitioner implements SerializableKafkaPartitioner {
+public class Tuple2Partitioner extends KafkaPartitioner implements Serializable {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
index 553ad61..21342b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
@@ -36,5 +36,5 @@ public interface SerializationSchema<T, R> extends Serializable {
* The incoming element to be serialized
* @return The serialized element.
*/
- public R serialize(T element);
+ R serialize(T element);
}
[2/2] flink git commit: [FLINK-2372] Add new FlinkKafkaProducer bases
on the new producer API
Posted by se...@apache.org.
[FLINK-2372] Add new FlinkKafkaProducer bases on the new producer API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/563ee9ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/563ee9ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/563ee9ad
Branch: refs/heads/release-0.10.0-milestone-1
Commit: 563ee9ad8dcf03142721b212acde91a68dc4a4d5
Parents: 41a1fdd
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Aug 28 14:33:49 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 7 15:44:18 2015 +0200
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaProducer.java | 225 ++++
.../streaming/connectors/kafka/KafkaSink.java | 187 ---
.../kafka/SerializableKafkaPartitioner.java | 25 -
.../connectors/kafka/api/KafkaSink.java | 3 +-
.../kafka/partitioner/FixedPartitioner.java | 80 ++
.../kafka/partitioner/KafkaPartitioner.java | 42 +
.../KafkaConsumerPartitionAssignmentTest.java | 2 -
.../connectors/kafka/KafkaConsumerTest.java | 3 +-
.../connectors/kafka/KafkaConsumerTestBase.java | 1096 ++++++++----------
.../streaming/connectors/kafka/KafkaITCase.java | 23 +-
.../connectors/kafka/KafkaProducerITCase.java | 8 +-
.../connectors/kafka/KafkaTestBase.java | 32 +-
.../connectors/kafka/TestFixedPartitioner.java | 104 ++
.../internals/ZookeeperOffsetHandlerTest.java | 1 -
.../kafka/testutils/DataGenerators.java | 19 +-
.../kafka/testutils/Tuple2Partitioner.java | 6 +-
.../util/serialization/SerializationSchema.java | 2 +-
17 files changed, 1023 insertions(+), 835 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
new file mode 100644
index 0000000..3d666ee
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.util.List;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic.
+ *
+ * Please note that this producer does not have any reliability guarantees.
+ *
+ * @param <IN> Type of the messages to write into Kafka.
+ */
+public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Array with the partition ids of the given topicId
+ * The size of this array is the number of partitions
+ */
+ private final int[] partitions;
+
+ /**
+ * User defined properties for the Producer
+ */
+ private final Properties producerConfig;
+
+ /**
+ * The name of the topic this producer is writing data to
+ */
+ private String topicId;
+
+ /**
+ * (Serializable) SerializationSchema for turning objects used with Flink into
+ * byte[] for Kafka.
+ */
+ private SerializationSchema<IN, byte[]> schema;
+
+ /**
+ * User-provided partitioner for assigning an object to a Kafka partition.
+ */
+ private KafkaPartitioner partitioner;
+
+ // -------------------------------- Runtime fields ------------------------------------------
+
+ /**
+ * KafkaProducer instance.
+ */
+ private transient KafkaProducer<byte[], byte[]> producer;
+
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+ * the topic.
+ *
+ * @param brokerList
+ * Comma separated addresses of the brokers
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema.
+ */
+ public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
+ this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+ * the topic.
+ *
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema.
+ * @param producerConfig
+ * Properties with the producer configuration.
+ */
+ public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig) {
+ this(topicId, serializationSchema, producerConfig, null);
+ }
+
+ /**
+ * The main constructor for creating a FlinkKafkaProducer.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+ */
+ public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
+ Preconditions.checkNotNull(topicId, "TopicID not set");
+ Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
+ Preconditions.checkNotNull(producerConfig, "producerConfig not set");
+ ClosureCleaner.ensureSerializable(customPartitioner);
+ ClosureCleaner.ensureSerializable(serializationSchema);
+
+ this.topicId = topicId;
+ this.schema = serializationSchema;
+ this.producerConfig = producerConfig;
+
+ // set the producer configuration properties.
+
+ if(!producerConfig.contains(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+ this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+ } else {
+ LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ }
+
+ if(!producerConfig.contains(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+ this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+ } else {
+ LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ }
+
+
+ // create a local KafkaProducer to get the list of partitions.
+ // this will also ensure locally that all required ProducerConfig values are set.
+ {
+ KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig);
+ List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(topicId);
+
+ this.partitions = new int[partitionsList.size()];
+ for (int i = 0; i < partitions.length; i++) {
+ partitions[i] = partitionsList.get(i).partition();
+ }
+ getPartitionsProd.close();
+ }
+
+ if(customPartitioner == null) {
+ this.partitioner = new FixedPartitioner();
+ } else {
+ this.partitioner = customPartitioner;
+ }
+ }
+
+
+ /**
+ * Initializes the connection to Kafka.
+ */
+ @Override
+ public void open(Configuration configuration) {
+ producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
+
+ partitioner.open(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), partitions);
+
+ LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), topicId);
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to Kafka.
+ *
+ * @param next
+ * The incoming data
+ */
+ @Override
+ public void invoke(IN next) {
+ byte[] serialized = schema.serialize(next);
+
+ producer.send(new ProducerRecord<byte[], byte[]>(topicId,
+ partitioner.partition(next, partitions.length),
+ null,
+ serialized),
+ new ErrorLoggingCallback(topicId, null, serialized, false));
+ }
+
+
+ @Override
+ public void close() {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+
+
+ // ----------------------------------- Utilities --------------------------
+
+ public static Properties getPropertiesFromBrokerList(String brokerList) {
+ String[] elements = brokerList.split(",");
+ for(String broker: elements) {
+ NetUtils.getCorrectHostnamePort(broker);
+ }
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
deleted file mode 100644
index d115913..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import com.google.common.base.Preconditions;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import kafka.serializer.DefaultEncoder;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.PartitionerWrapper;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-
-/**
- * Sink that emits its inputs to a Kafka topic.
- *
- * @param <IN>
- * Type of the sink input
- */
-public class KafkaSink<IN> extends RichSinkFunction<IN> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
-
- private Producer<IN, byte[]> producer;
- private Properties userDefinedProperties;
- private String topicId;
- private String brokerList;
- private SerializationSchema<IN, byte[]> schema;
- private SerializableKafkaPartitioner partitioner;
- private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
-
- /**
- * Creates a KafkaSink for a given topic. The sink produces its input to
- * the topic.
- *
- * @param brokerList
- * Addresses of the brokers
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined serialization schema.
- */
- public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
- this(brokerList, topicId, new Properties(), serializationSchema);
- }
-
- /**
- * Creates a KafkaSink for a given topic with custom Producer configuration.
- * If you use this constructor, the broker should be set with the "metadata.broker.list"
- * configuration.
- *
- * @param brokerList
- * Addresses of the brokers
- * @param topicId
- * ID of the Kafka topic.
- * @param producerConfig
- * Configurations of the Kafka producer
- * @param serializationSchema
- * User defined serialization schema.
- */
- public KafkaSink(String brokerList, String topicId, Properties producerConfig, SerializationSchema<IN, byte[]> serializationSchema) {
- String[] elements = brokerList.split(",");
- for(String broker: elements) {
- NetUtils.getCorrectHostnamePort(broker);
- }
- Preconditions.checkNotNull(topicId, "TopicID not set");
-
- this.brokerList = brokerList;
- this.topicId = topicId;
- this.schema = serializationSchema;
- this.partitionerClass = null;
- this.userDefinedProperties = producerConfig;
- }
-
- /**
- * Creates a KafkaSink for a given topic. The sink produces its input to
- * the topic.
- *
- * @param brokerList
- * @param topicId
- * ID of the Kafka topic.
- * @param serializationSchema
- * User defined serialization schema.
- * @param partitioner
- * User defined partitioner.
- */
- public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
- this(brokerList, topicId, serializationSchema);
- ClosureCleaner.ensureSerializable(partitioner);
- this.partitioner = partitioner;
- }
-
- public KafkaSink(String brokerList,
- String topicId,
- SerializationSchema<IN, byte[]> serializationSchema,
- Class<? extends SerializableKafkaPartitioner> partitioner) {
- this(brokerList, topicId, serializationSchema);
- this.partitionerClass = partitioner;
- }
-
- /**
- * Initializes the connection to Kafka.
- */
- @Override
- public void open(Configuration configuration) {
-
- Properties properties = new Properties();
-
- properties.put("metadata.broker.list", brokerList);
- properties.put("request.required.acks", "-1");
- properties.put("message.send.max.retries", "10");
-
- properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
-
- // this will not be used as the key will not be serialized
- properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
-
- for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet()) {
- properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
- }
-
- if (partitioner != null) {
- properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
- // java serialization will do the rest.
- properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
- }
- if (partitionerClass != null) {
- properties.put("partitioner.class", partitionerClass);
- }
-
- ProducerConfig config = new ProducerConfig(properties);
-
- try {
- producer = new Producer<IN, byte[]>(config);
- } catch (NullPointerException e) {
- throw new RuntimeException("Cannot connect to Kafka broker " + brokerList, e);
- }
- }
-
- /**
- * Called when new data arrives to the sink, and forwards it to Kafka.
- *
- * @param next
- * The incoming data
- */
- @Override
- public void invoke(IN next) {
- byte[] serialized = schema.serialize(next);
-
- // Sending message without serializable key.
- producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
- }
-
- @Override
- public void close() {
- if (producer != null) {
- producer.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java
deleted file mode 100644
index 7b9f991..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/SerializableKafkaPartitioner.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import kafka.producer.Partitioner;
-
-import java.io.Serializable;
-
-public interface SerializableKafkaPartitioner extends Serializable, Partitioner {
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index c8400a5..f856926 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kafka.api;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
/**
@@ -26,7 +27,7 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
* This class will be removed in future releases of Flink.
*/
@Deprecated
-public class KafkaSink<IN> extends org.apache.flink.streaming.connectors.kafka.KafkaSink<IN> {
+public class KafkaSink<IN> extends FlinkKafkaProducer<IN> {
public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
super(brokerList, topicId, serializationSchema);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
new file mode 100644
index 0000000..346a7d5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ * # More Flink partitions than kafka partitions
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 --------------/
+ * 3 -------------/
+ * 4 ------------/
+ * </pre>
+ * --> Some (or all) kafka partitions contain the output of more than one flink partition
+ *
+ *# Fewer Flink partitions than Kafka
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 ----------------> 2
+ * 3
+ * 4
+ * 5
+ * </pre>
+ *
+ * --> Not all Kafka partitions contain data
+ * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
+ * cause a lot of network connections between all the Flink instances and all the Kafka brokers
+ *
+ *
+ */
+public class FixedPartitioner extends KafkaPartitioner implements Serializable {
+ private static final long serialVersionUID = 1627268846962918126L;
+
+ int targetPartition = -1;
+
+ @Override
+ public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+ int p = 0;
+ for(int i = 0; i < parallelInstances; i++) {
+ if(i == parallelInstanceId) {
+ targetPartition = partitions[p];
+ return;
+ }
+ if(++p == partitions.length) {
+ p = 0;
+ }
+ }
+ }
+
+ @Override
+ public int partition(Object element, int numPartitions) {
+ if(targetPartition == -1) {
+ throw new RuntimeException("The partitioner has not been initialized properly");
+ }
+ return targetPartition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
new file mode 100644
index 0000000..55519f0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+
+import kafka.producer.Partitioner;
+
+import java.io.Serializable;
+
+/**
+ * Extended Kafka Partitioner.
+ * It contains a open() method which is called on each parallel instance.
+ * Partitioners have to be serializable!
+ */
+public abstract class KafkaPartitioner implements Partitioner, Serializable {
+
+ private static final long serialVersionUID = -1974260817778593473L;
+
+ /**
+ * Initializer for the Partitioner.
+ * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+ * @param parallelInstances the total number of parallel instances
+ * @param partitions an array describing the partition IDs of the available Kafka partitions.
+ */
+ public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+ // overwrite this method if needed.
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index 3d392aa..9281e04 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
index e35fcfb..cf745b7 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+
import org.junit.Ignore;
import org.junit.Test;
@@ -124,7 +124,6 @@ public class KafkaConsumerTest {
}
@Test
- @Ignore("Kafka consumer internally makes an infinite loop")
public void testCreateSourceWithoutCluster() {
try {
Properties props = new Properties();
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 939f564..8676d5d 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.connectors.kafka;
-
import kafka.admin.AdminUtils;
import kafka.api.PartitionMetadata;
import kafka.consumer.Consumer;
@@ -67,6 +66,7 @@ import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.util.Collector;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.Assert;
import scala.collection.Seq;
@@ -113,67 +113,61 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
/**
* Test that validates that checkpointing and checkpoint notification works properly
*/
- public void runCheckpointingTest() {
- try {
- createTestTopic("testCheckpointing", 1, 1);
+ public void runCheckpointingTest() throws Exception {
+ createTestTopic("testCheckpointing", 1, 1);
- FlinkKafkaConsumer<String> source = getConsumer("testCheckpointing", new JavaDefaultStringSchema(), standardProps);
- Field pendingCheckpointsField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
- pendingCheckpointsField.setAccessible(true);
- LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
+ FlinkKafkaConsumer<String> source = getConsumer("testCheckpointing", new JavaDefaultStringSchema(), standardProps);
+ Field pendingCheckpointsField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
+ pendingCheckpointsField.setAccessible(true);
+ LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
- Assert.assertEquals(0, pendingCheckpoints.size());
- source.setRuntimeContext(new MockRuntimeContext(1, 0));
+ Assert.assertEquals(0, pendingCheckpoints.size());
+ source.setRuntimeContext(new MockRuntimeContext(1, 0));
- final long[] initialOffsets = new long[] { 1337 };
+ final long[] initialOffsets = new long[] { 1337 };
- // first restore
- source.restoreState(initialOffsets);
+ // first restore
+ source.restoreState(initialOffsets);
- // then open
- source.open(new Configuration());
- long[] state1 = source.snapshotState(1, 15);
+ // then open
+ source.open(new Configuration());
+ long[] state1 = source.snapshotState(1, 15);
- assertArrayEquals(initialOffsets, state1);
+ assertArrayEquals(initialOffsets, state1);
- long[] state2 = source.snapshotState(2, 30);
- Assert.assertArrayEquals(initialOffsets, state2);
- Assert.assertEquals(2, pendingCheckpoints.size());
+ long[] state2 = source.snapshotState(2, 30);
+ Assert.assertArrayEquals(initialOffsets, state2);
+ Assert.assertEquals(2, pendingCheckpoints.size());
- source.notifyCheckpointComplete(1);
- Assert.assertEquals(1, pendingCheckpoints.size());
+ source.notifyCheckpointComplete(1);
+ Assert.assertEquals(1, pendingCheckpoints.size());
- source.notifyCheckpointComplete(2);
- Assert.assertEquals(0, pendingCheckpoints.size());
+ source.notifyCheckpointComplete(2);
+ Assert.assertEquals(0, pendingCheckpoints.size());
- source.notifyCheckpointComplete(666); // invalid checkpoint
- Assert.assertEquals(0, pendingCheckpoints.size());
+ source.notifyCheckpointComplete(666); // invalid checkpoint
+ Assert.assertEquals(0, pendingCheckpoints.size());
- // create 500 snapshots
- for (int i = 100; i < 600; i++) {
- source.snapshotState(i, 15 * i);
- }
- Assert.assertEquals(FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
+ // create 500 snapshots
+ for (int i = 100; i < 600; i++) {
+ source.snapshotState(i, 15 * i);
+ }
+ Assert.assertEquals(FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size());
- // commit only the second last
- source.notifyCheckpointComplete(598);
- Assert.assertEquals(1, pendingCheckpoints.size());
+ // commit only the second last
+ source.notifyCheckpointComplete(598);
+ Assert.assertEquals(1, pendingCheckpoints.size());
- // access invalid checkpoint
- source.notifyCheckpointComplete(590);
+ // access invalid checkpoint
+ source.notifyCheckpointComplete(590);
- // and the last
- source.notifyCheckpointComplete(599);
- Assert.assertEquals(0, pendingCheckpoints.size());
+ // and the last
+ source.notifyCheckpointComplete(599);
+ Assert.assertEquals(0, pendingCheckpoints.size());
- source.close();
+ source.close();
- deleteTestTopic("testCheckpointing");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ deleteTestTopic("testCheckpointing");
}
/**
@@ -181,70 +175,64 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
*
* This test is only applicable if Teh Flink Kafka Consumer uses the ZooKeeperOffsetHandler.
*/
- public void runOffsetInZookeeperValidationTest() {
- try {
- LOG.info("Starting testFlinkKafkaConsumerWithOffsetUpdates()");
+ public void runOffsetInZookeeperValidationTest() throws Exception {
+ LOG.info("Starting testFlinkKafkaConsumerWithOffsetUpdates()");
- final String topicName = "testOffsetHacking";
- final int parallelism = 3;
-
- createTestTopic(topicName, parallelism, 1);
+ final String topicName = "testOffsetHacking";
+ final int parallelism = 3;
- StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env1.getConfig().disableSysoutLogging();
- env1.enableCheckpointing(50);
- env1.setNumberOfExecutionRetries(0);
- env1.setParallelism(parallelism);
+ createTestTopic(topicName, parallelism, 1);
- StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env2.getConfig().disableSysoutLogging();
- env2.enableCheckpointing(50);
- env2.setNumberOfExecutionRetries(0);
- env2.setParallelism(parallelism);
+ StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env1.getConfig().disableSysoutLogging();
+ env1.enableCheckpointing(50);
+ env1.setNumberOfExecutionRetries(0);
+ env1.setParallelism(parallelism);
- StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env3.getConfig().disableSysoutLogging();
- env3.enableCheckpointing(50);
- env3.setNumberOfExecutionRetries(0);
- env3.setParallelism(parallelism);
+ StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env2.getConfig().disableSysoutLogging();
+ env2.enableCheckpointing(50);
+ env2.setNumberOfExecutionRetries(0);
+ env2.setParallelism(parallelism);
- // write a sequence from 0 to 99 to each of the 3 partitions.
- writeSequence(env1, topicName, 100, parallelism);
+ StreamExecutionEnvironment env3 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env3.getConfig().disableSysoutLogging();
+ env3.enableCheckpointing(50);
+ env3.setNumberOfExecutionRetries(0);
+ env3.setParallelism(parallelism);
- readSequence(env2, standardProps, parallelism, topicName, 100, 0);
+ // write a sequence from 0 to 99 to each of the 3 partitions.
+ writeSequence(env1, topicName, 100, parallelism);
- ZkClient zkClient = createZookeeperClient();
-
- long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
- long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
- long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 2);
+ readSequence(env2, standardProps, parallelism, topicName, 100, 0);
- LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
+ ZkClient zkClient = createZookeeperClient();
- assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
- assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
- assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+ long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 0);
+ long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 1);
+ long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(), topicName, 2);
- LOG.info("Manipulating offsets");
+ LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
- // set the offset to 50 for the three partitions
- ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 0, 49);
- ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 1, 49);
- ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 2, 49);
+ assertTrue(o1 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+ assertTrue(o2 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
+ assertTrue(o3 == FlinkKafkaConsumer.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100));
- zkClient.close();
-
- // create new env
- readSequence(env3, standardProps, parallelism, topicName, 50, 50);
+ LOG.info("Manipulating offsets");
- deleteTestTopic(topicName);
-
- LOG.info("Finished testFlinkKafkaConsumerWithOffsetUpdates()");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ // set the offset to 50 for the three partitions
+ ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 0, 49);
+ ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 1, 49);
+ ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topicName, 2, 49);
+
+ zkClient.close();
+
+ // create new env
+ readSequence(env3, standardProps, parallelism, topicName, 50, 50);
+
+ deleteTestTopic(topicName);
+
+ LOG.info("Finished testFlinkKafkaConsumerWithOffsetUpdates()");
}
/**
@@ -255,655 +243,600 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
* (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
* </pre>
*/
- public void runSimpleConcurrentProducerConsumerTopology() {
- try {
- LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
+ public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
+ LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
- final String topic = "concurrentProducerConsumerTopic";
- final int parallelism = 3;
- final int elementsPerPartition = 100;
- final int totalElements = parallelism * elementsPerPartition;
+ final String topic = "concurrentProducerConsumerTopic";
+ final int parallelism = 3;
+ final int elementsPerPartition = 100;
+ final int totalElements = parallelism * elementsPerPartition;
- createTestTopic(topic, parallelism, 2);
+ createTestTopic(topic, parallelism, 2);
- final StreamExecutionEnvironment env =
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(parallelism);
- env.setNumberOfExecutionRetries(0);
- env.getConfig().disableSysoutLogging();
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(parallelism);
+ env.setNumberOfExecutionRetries(0);
+ env.getConfig().disableSysoutLogging();
- TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
+ TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
- TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
- new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
+ TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
+ new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
- TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
- new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
+ TypeInformationSerializationSchema<Tuple2<Long, String>> sinkSchema =
+ new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
- // ----------- add producer dataflow ----------
+ // ----------- add producer dataflow ----------
- DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
+ DataStream<Tuple2<Long, String>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Long,String>>() {
- private boolean running = true;
+ private boolean running = true;
- @Override
- public void run(SourceContext<Tuple2<Long, String>> ctx) {
- int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
- int limit = cnt + elementsPerPartition;
+ @Override
+ public void run(SourceContext<Tuple2<Long, String>> ctx) {
+ int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
+ int limit = cnt + elementsPerPartition;
- while (running && cnt < limit) {
- ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt));
- cnt++;
- }
- }
-
- @Override
- public void cancel() {
- running = false;
+ while (running && cnt < limit) {
+ ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt));
+ cnt++;
}
- });
- stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings, topic, sinkSchema));
-
- // ----------- add consumer dataflow ----------
-
- FlinkKafkaConsumer<Tuple2<Long, String>> source = getConsumer(topic, sourceSchema, standardProps);
+ }
- DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
+ stream.addSink(new FlinkKafkaProducer<Tuple2<Long, String>>(brokerConnectionStrings, topic, sinkSchema));
- consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
+ // ----------- add consumer dataflow ----------
- private int elCnt = 0;
- private BitSet validator = new BitSet(totalElements);
+ FlinkKafkaConsumer<Tuple2<Long, String>> source = getConsumer(topic, sourceSchema, standardProps);
- @Override
- public void invoke(Tuple2<Long, String> value) throws Exception {
- String[] sp = value.f1.split("-");
- int v = Integer.parseInt(sp[1]);
+ DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);
- assertEquals(value.f0 - 1000, (long) v);
+ consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
- assertFalse("Received tuple twice", validator.get(v));
- validator.set(v);
- elCnt++;
+ private int elCnt = 0;
+ private BitSet validator = new BitSet(totalElements);
- if (elCnt == totalElements) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) != totalElements) {
- fail("The bitset was not set to 1 on all elements. Next clear:"
- + nc + " Set: " + validator);
- }
- throw new SuccessException();
+ @Override
+ public void invoke(Tuple2<Long, String> value) throws Exception {
+ String[] sp = value.f1.split("-");
+ int v = Integer.parseInt(sp[1]);
+
+ assertEquals(value.f0 - 1000, (long) v);
+
+ assertFalse("Received tuple twice", validator.get(v));
+ validator.set(v);
+ elCnt++;
+
+ if (elCnt == totalElements) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) != totalElements) {
+ fail("The bitset was not set to 1 on all elements. Next clear:"
+ + nc + " Set: " + validator);
}
+ throw new SuccessException();
}
+ }
- @Override
- public void close() throws Exception {
- super.close();
- }
- }).setParallelism(1);
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
+ }).setParallelism(1);
- tryExecute(env, "runSimpleConcurrentProducerConsumerTopology");
+ tryExecute(env, "runSimpleConcurrentProducerConsumerTopology");
- LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
+ LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
- deleteTestTopic(topic);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ deleteTestTopic(topic);
}
/**
* Tests the proper consumption when having a 1:1 correspondence between kafka partitions and
* Flink sources.
*/
- public void runOneToOneExactlyOnceTest() {
- try {
- LOG.info("Starting runOneToOneExactlyOnceTest()");
+ public void runOneToOneExactlyOnceTest() throws Exception {
+ LOG.info("Starting runOneToOneExactlyOnceTest()");
- final String topic = "oneToOneTopic";
- final int parallelism = 5;
- final int numElementsPerPartition = 1000;
- final int totalElements = parallelism * numElementsPerPartition;
- final int failAfterElements = numElementsPerPartition / 3;
-
- createTestTopic(topic, parallelism, 1);
-
- DataGenerators.generateRandomizedIntegerSequence(
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
- brokerConnectionStrings,
- topic, parallelism, numElementsPerPartition, true);
-
- // run the topology that fails and recovers
+ final String topic = "oneToOneTopic";
+ final int parallelism = 5;
+ final int numElementsPerPartition = 1000;
+ final int totalElements = parallelism * numElementsPerPartition;
+ final int failAfterElements = numElementsPerPartition / 3;
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.enableCheckpointing(500);
- env.setParallelism(parallelism);
- env.setNumberOfExecutionRetries(3);
- env.getConfig().disableSysoutLogging();
-
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
- env
- .addSource(kafkaSource)
- .map(new PartitionValidatingMapper(parallelism, 1))
- .map(new FailingIdentityMapper<Integer>(failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+ createTestTopic(topic, parallelism, 1);
+
+ DataGenerators.generateRandomizedIntegerSequence(
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+ brokerConnectionStrings,
+ topic, parallelism, numElementsPerPartition, true);
+
+ // run the topology that fails and recovers
- FailingIdentityMapper.failedBefore = false;
- tryExecute(env, "One-to-one exactly once test");
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
- // this cannot be reliably checked, as checkpoints come in time intervals, and
- // failures after a number of elements
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+ FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+
+ env
+ .addSource(kafkaSource)
+ .map(new PartitionValidatingMapper(parallelism, 1))
+ .map(new FailingIdentityMapper<Integer>(failAfterElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+ FailingIdentityMapper.failedBefore = false;
+ tryExecute(env, "One-to-one exactly once test");
+
+ // this cannot be reliably checked, as checkpoints come in time intervals, and
+ // failures after a number of elements
// assertTrue("Job did not do a checkpoint before the failure",
// FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-
- deleteTestTopic(topic);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
+ deleteTestTopic(topic);
}
/**
* Tests the proper consumption when having fewer Flink sources than Kafka partitions, so
* one Flink source will read multiple Kafka partitions.
*/
- public void runOneSourceMultiplePartitionsExactlyOnceTest() {
- try {
- LOG.info("Starting runOneSourceMultiplePartitionsExactlyOnceTest()");
+ public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
+ LOG.info("Starting runOneSourceMultiplePartitionsExactlyOnceTest()");
- final String topic = "oneToManyTopic";
- final int numPartitions = 5;
- final int numElementsPerPartition = 1000;
- final int totalElements = numPartitions * numElementsPerPartition;
- final int failAfterElements = numElementsPerPartition / 3;
-
- final int parallelism = 2;
+ final String topic = "oneToManyTopic";
+ final int numPartitions = 5;
+ final int numElementsPerPartition = 1000;
+ final int totalElements = numPartitions * numElementsPerPartition;
+ final int failAfterElements = numElementsPerPartition / 3;
- createTestTopic(topic, numPartitions, 1);
+ final int parallelism = 2;
- DataGenerators.generateRandomizedIntegerSequence(
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
- brokerConnectionStrings,
- topic, numPartitions, numElementsPerPartition, true);
+ createTestTopic(topic, numPartitions, 1);
- // run the topology that fails and recovers
+ DataGenerators.generateRandomizedIntegerSequence(
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+ brokerConnectionStrings,
+ topic, numPartitions, numElementsPerPartition, true);
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+ // run the topology that fails and recovers
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.enableCheckpointing(500);
- env.setParallelism(parallelism);
- env.setNumberOfExecutionRetries(3);
- env.getConfig().disableSysoutLogging();
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
- env
- .addSource(kafkaSource)
- .map(new PartitionValidatingMapper(numPartitions, 3))
- .map(new FailingIdentityMapper<Integer>(failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+ FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
- FailingIdentityMapper.failedBefore = false;
- tryExecute(env, "One-source-multi-partitions exactly once test");
+ env
+ .addSource(kafkaSource)
+ .map(new PartitionValidatingMapper(numPartitions, 3))
+ .map(new FailingIdentityMapper<Integer>(failAfterElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
- // this cannot be reliably checked, as checkpoints come in time intervals, and
- // failures after a number of elements
+ FailingIdentityMapper.failedBefore = false;
+ tryExecute(env, "One-source-multi-partitions exactly once test");
+
+ // this cannot be reliably checked, as checkpoints come in time intervals, and
+ // failures after a number of elements
// assertTrue("Job did not do a checkpoint before the failure",
// FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-
- deleteTestTopic(topic);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
+ deleteTestTopic(topic);
}
/**
* Tests the proper consumption when having more Flink sources than Kafka partitions, which means
* that some Flink sources will read no partitions.
*/
- public void runMultipleSourcesOnePartitionExactlyOnceTest() {
- try {
- LOG.info("Starting runMultipleSourcesOnePartitionExactlyOnceTest()");
+ public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
+ LOG.info("Starting runMultipleSourcesOnePartitionExactlyOnceTest()");
- final String topic = "manyToOneTopic";
- final int numPartitions = 5;
- final int numElementsPerPartition = 1000;
- final int totalElements = numPartitions * numElementsPerPartition;
- final int failAfterElements = numElementsPerPartition / 3;
+ final String topic = "manyToOneTopic";
+ final int numPartitions = 5;
+ final int numElementsPerPartition = 1000;
+ final int totalElements = numPartitions * numElementsPerPartition;
+ final int failAfterElements = numElementsPerPartition / 3;
- final int parallelism = 8;
+ final int parallelism = 8;
- createTestTopic(topic, numPartitions, 1);
+ createTestTopic(topic, numPartitions, 1);
- DataGenerators.generateRandomizedIntegerSequence(
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
- brokerConnectionStrings,
- topic, numPartitions, numElementsPerPartition, true);
+ DataGenerators.generateRandomizedIntegerSequence(
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+ brokerConnectionStrings,
+ topic, numPartitions, numElementsPerPartition, true);
- // run the topology that fails and recovers
-
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+ // run the topology that fails and recovers
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.enableCheckpointing(500);
- env.setParallelism(parallelism);
- env.setNumberOfExecutionRetries(3);
- env.getConfig().disableSysoutLogging();
- env.setBufferTimeout(0);
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
- env
- .addSource(kafkaSource)
- .map(new PartitionValidatingMapper(numPartitions, 1))
- .map(new FailingIdentityMapper<Integer>(failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
-
- FailingIdentityMapper.failedBefore = false;
- tryExecute(env, "multi-source-one-partitions exactly once test");
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.enableCheckpointing(500);
+ env.setParallelism(parallelism);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+ env.setBufferTimeout(0);
+
+ FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
- // this cannot be reliably checked, as checkpoints come in time intervals, and
- // failures after a number of elements
+ env
+ .addSource(kafkaSource)
+ .map(new PartitionValidatingMapper(numPartitions, 1))
+ .map(new FailingIdentityMapper<Integer>(failAfterElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+ FailingIdentityMapper.failedBefore = false;
+ tryExecute(env, "multi-source-one-partitions exactly once test");
+
+ // this cannot be reliably checked, as checkpoints come in time intervals, and
+ // failures after a number of elements
// assertTrue("Job did not do a checkpoint before the failure",
// FailingIdentityMapper.hasBeenCheckpointedBeforeFailure);
-
- deleteTestTopic(topic);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
+ deleteTestTopic(topic);
}
/**
* Tests that the source can be properly canceled when reading full partitions.
*/
- public void runCancelingOnFullInputTest() {
- try {
- final String topic = "cancelingOnFullTopic";
+ public void runCancelingOnFullInputTest() throws Exception {
+ final String topic = "cancelingOnFullTopic";
- final int parallelism = 3;
- createTestTopic(topic, parallelism, 1);
+ final int parallelism = 3;
+ createTestTopic(topic, parallelism, 1);
- // launch a producer thread
- DataGenerators.InfiniteStringsGenerator generator =
- new DataGenerators.InfiniteStringsGenerator(brokerConnectionStrings, topic);
- generator.start();
+ // launch a producer thread
+ DataGenerators.InfiniteStringsGenerator generator =
+ new DataGenerators.InfiniteStringsGenerator(brokerConnectionStrings, topic);
+ generator.start();
- // launch a consumer asynchronously
+ // launch a consumer asynchronously
- final AtomicReference<Throwable> jobError = new AtomicReference<>();
+ final AtomicReference<Throwable> jobError = new AtomicReference<>();
- final Runnable jobRunner = new Runnable() {
- @Override
- public void run() {
- try {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(parallelism);
- env.enableCheckpointing(100);
- env.getConfig().disableSysoutLogging();
+ final Runnable jobRunner = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(100);
+ env.getConfig().disableSysoutLogging();
- FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
+ FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
- env.addSource(source).addSink(new DiscardingSink<String>());
+ env.addSource(source).addSink(new DiscardingSink<String>());
- env.execute();
- }
- catch (Throwable t) {
- jobError.set(t);
- }
+ env.execute();
}
- };
-
- Thread runnerThread = new Thread(jobRunner, "program runner thread");
- runnerThread.start();
+ catch (Throwable t) {
+ jobError.set(t);
+ }
+ }
+ };
- // wait a bit before canceling
- Thread.sleep(2000);
+ Thread runnerThread = new Thread(jobRunner, "program runner thread");
+ runnerThread.start();
- // cancel
- JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManagerGateway());
+ // wait a bit before canceling
+ Thread.sleep(2000);
- // wait for the program to be done and validate that we failed with the right exception
- runnerThread.join();
+ // cancel
+ JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManagerGateway());
- Throwable failueCause = jobError.get();
- assertNotNull("program did not fail properly due to canceling", failueCause);
- assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+ // wait for the program to be done and validate that we failed with the right exception
+ runnerThread.join();
- if (generator.isAlive()) {
- generator.shutdown();
- generator.join();
- }
- else {
- Throwable t = generator.getError();
- if (t != null) {
- t.printStackTrace();
- fail("Generator failed: " + t.getMessage());
- } else {
- fail("Generator failed with no exception");
- }
- }
+ Throwable failueCause = jobError.get();
+ assertNotNull("program did not fail properly due to canceling", failueCause);
+ assertTrue(failueCause.getMessage().contains("Job was cancelled"));
- deleteTestTopic(topic);
+ if (generator.isAlive()) {
+ generator.shutdown();
+ generator.join();
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ else {
+ Throwable t = generator.getError();
+ if (t != null) {
+ t.printStackTrace();
+ fail("Generator failed: " + t.getMessage());
+ } else {
+ fail("Generator failed with no exception");
+ }
}
+
+ deleteTestTopic(topic);
}
/**
* Tests that the source can be properly canceled when reading empty partitions.
*/
- public void runCancelingOnEmptyInputTest() {
- try {
- final String topic = "cancelingOnEmptyInputTopic";
+ public void runCancelingOnEmptyInputTest() throws Exception {
+ final String topic = "cancelingOnEmptyInputTopic";
- final int parallelism = 3;
- createTestTopic(topic, parallelism, 1);
+ final int parallelism = 3;
+ createTestTopic(topic, parallelism, 1);
- final AtomicReference<Throwable> error = new AtomicReference<>();
+ final AtomicReference<Throwable> error = new AtomicReference<>();
- final Runnable jobRunner = new Runnable() {
- @Override
- public void run() {
- try {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(parallelism);
- env.enableCheckpointing(100);
- env.getConfig().disableSysoutLogging();
+ final Runnable jobRunner = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(100);
+ env.getConfig().disableSysoutLogging();
- FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
+ FlinkKafkaConsumer<String> source = getConsumer(topic, new JavaDefaultStringSchema(), standardProps);
- env.addSource(source).addSink(new DiscardingSink<String>());
+ env.addSource(source).addSink(new DiscardingSink<String>());
- env.execute();
- }
- catch (Throwable t) {
- error.set(t);
- }
+ env.execute();
+ }
+ catch (Throwable t) {
+ error.set(t);
}
- };
+ }
+ };
- Thread runnerThread = new Thread(jobRunner, "program runner thread");
- runnerThread.start();
+ Thread runnerThread = new Thread(jobRunner, "program runner thread");
+ runnerThread.start();
- // wait a bit before canceling
- Thread.sleep(2000);
+ // wait a bit before canceling
+ Thread.sleep(2000);
- // cancel
- JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManagerGateway());
+ // cancel
+ JobManagerCommunicationUtils.cancelCurrentJob(flink.getJobManagerGateway());
- // wait for the program to be done and validate that we failed with the right exception
- runnerThread.join();
+ // wait for the program to be done and validate that we failed with the right exception
+ runnerThread.join();
- Throwable failueCause = error.get();
- assertNotNull("program did not fail properly due to canceling", failueCause);
- assertTrue(failueCause.getMessage().contains("Job was cancelled"));
+ Throwable failueCause = error.get();
+ assertNotNull("program did not fail properly due to canceling", failueCause);
+ assertTrue(failueCause.getMessage().contains("Job was cancelled"));
- deleteTestTopic(topic);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ deleteTestTopic(topic);
}
/**
* Tests that the source can be properly canceled when reading full partitions.
*/
- public void runFailOnDeployTest() {
- try {
- final String topic = "failOnDeployTopic";
-
- createTestTopic(topic, 2, 1);
+ public void runFailOnDeployTest() throws Exception {
+ final String topic = "failOnDeployTopic";
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+ createTestTopic(topic, 2, 1);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(12); // needs to be more that the mini cluster has slots
- env.getConfig().disableSysoutLogging();
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
-
- env
- .addSource(kafkaSource)
- .addSink(new DiscardingSink<Integer>());
-
- try {
- env.execute();
- fail("this test should fail with an exception");
- }
- catch (ProgramInvocationException e) {
-
- // validate that we failed due to a NoResourceAvailableException
- Throwable cause = e.getCause();
- int depth = 0;
- boolean foundResourceException = false;
-
- while (cause != null && depth++ < 20) {
- if (cause instanceof NoResourceAvailableException) {
- foundResourceException = true;
- break;
- }
- cause = cause.getCause();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(12); // needs to be more that the mini cluster has slots
+ env.getConfig().disableSysoutLogging();
+
+ FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+
+ env
+ .addSource(kafkaSource)
+ .addSink(new DiscardingSink<Integer>());
+
+ try {
+ env.execute();
+ fail("this test should fail with an exception");
+ }
+ catch (ProgramInvocationException e) {
+
+ // validate that we failed due to a NoResourceAvailableException
+ Throwable cause = e.getCause();
+ int depth = 0;
+ boolean foundResourceException = false;
+
+ while (cause != null && depth++ < 20) {
+ if (cause instanceof NoResourceAvailableException) {
+ foundResourceException = true;
+ break;
}
-
- assertTrue("Wrong exception", foundResourceException);
+ cause = cause.getCause();
}
- deleteTestTopic(topic);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ assertTrue("Wrong exception", foundResourceException);
}
+
+ deleteTestTopic(topic);
}
/**
* Test Flink's Kafka integration also with very big records (30MB)
* see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
*/
- public void runBigRecordTestTopology() {
- try {
- LOG.info("Starting runBigRecordTestTopology()");
+ public void runBigRecordTestTopology() throws Exception {
+ LOG.info("Starting runBigRecordTestTopology()");
- final String topic = "bigRecordTestTopic";
- final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
-
- createTestTopic(topic, parallelism, 1);
+ final String topic = "bigRecordTestTopic";
+ final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
- final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
+ createTestTopic(topic, parallelism, 1);
- final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
- new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
+ final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
- final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema =
- new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
+ final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
+ new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setNumberOfExecutionRetries(0);
- env.getConfig().disableSysoutLogging();
- env.enableCheckpointing(100);
- env.setParallelism(parallelism);
+ final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> deserSchema =
+ new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
- // add consuming topology:
- Properties consumerProps = new Properties();
- consumerProps.putAll(standardProps);
- consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40));
- consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher
- consumerProps.setProperty("queued.max.message.chunks", "1");
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setNumberOfExecutionRetries(0);
+ env.getConfig().disableSysoutLogging();
+ env.enableCheckpointing(100);
+ env.setParallelism(parallelism);
- FlinkKafkaConsumer<Tuple2<Long, byte[]>> source = getConsumer(topic, serSchema, consumerProps);
- DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
+ // add consuming topology:
+ Properties consumerProps = new Properties();
+ consumerProps.putAll(standardProps);
+ consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 40));
+ consumerProps.setProperty("max.partition.fetch.bytes", Integer.toString(1024 * 1024 * 40)); // for the new fetcher
+ consumerProps.setProperty("queued.max.message.chunks", "1");
- consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
+ FlinkKafkaConsumer<Tuple2<Long, byte[]>> source = getConsumer(topic, serSchema, consumerProps);
+ DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(source);
- private int elCnt = 0;
+ consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
- @Override
- public void invoke(Tuple2<Long, byte[]> value) throws Exception {
- elCnt++;
- if (value.f0 == -1) {
- // we should have seen 11 elements now.
- if(elCnt == 11) {
- throw new SuccessException();
- } else {
- throw new RuntimeException("There have been "+elCnt+" elements");
- }
- }
- if(elCnt > 10) {
- throw new RuntimeException("More than 10 elements seen: "+elCnt);
+ private int elCnt = 0;
+
+ @Override
+ public void invoke(Tuple2<Long, byte[]> value) throws Exception {
+ elCnt++;
+ if (value.f0 == -1) {
+ // we should have seen 11 elements now.
+ if(elCnt == 11) {
+ throw new SuccessException();
+ } else {
+ throw new RuntimeException("There have been "+elCnt+" elements");
}
}
- });
+ if(elCnt > 10) {
+ throw new RuntimeException("More than 10 elements seen: "+elCnt);
+ }
+ }
+ });
- // add producing topology
- Properties producerProps = new Properties();
- producerProps.setProperty("max.message.size", Integer.toString(1024 * 1024 * 30));
-
- DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
+ // add producing topology
+ Properties producerProps = new Properties();
+ producerProps.setProperty("max.request.size", Integer.toString(1024 * 1024 * 30));
+ producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerConnectionStrings);
- private boolean running;
+ DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- running = true;
- }
+ private boolean running;
- @Override
- public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
- Random rnd = new Random();
- long cnt = 0;
- int fifteenMb = 1024 * 1024 * 15;
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ running = true;
+ }
- while (running) {
- byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
- ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
+ @Override
+ public void run(SourceContext<Tuple2<Long, byte[]>> ctx) throws Exception {
+ Random rnd = new Random();
+ long cnt = 0;
+ int fifteenMb = 1024 * 1024 * 15;
- Thread.sleep(100);
+ while (running) {
+ byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
+ ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
- if (cnt == 10) {
- // signal end
- ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
- break;
- }
+ Thread.sleep(100);
+
+ if (cnt == 10) {
+ // signal end
+ ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
+ break;
}
}
+ }
- @Override
- public void cancel() {
- running = false;
- }
- });
+ @Override
+ public void cancel() {
+ running = false;
+ }
+ });
- stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings, topic,
- producerProps, deserSchema));
+ stream.addSink(new FlinkKafkaProducer<Tuple2<Long, byte[]>>(topic, deserSchema, producerProps));
- tryExecute(env, "big topology test");
+ tryExecute(env, "big topology test");
- deleteTestTopic(topic);
-
- LOG.info("Finished runBigRecordTestTopology()");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ deleteTestTopic(topic);
+
+ LOG.info("Finished runBigRecordTestTopology()");
}
- public void runBrokerFailureTest() {
- try {
- LOG.info("starting runBrokerFailureTest()");
-
- final String topic = "brokerFailureTestTopic";
+ public void runBrokerFailureTest() throws Exception {
+ LOG.info("starting runBrokerFailureTest()");
- final int parallelism = 2;
- final int numElementsPerPartition = 1000;
- final int totalElements = parallelism * numElementsPerPartition;
- final int failAfterElements = numElementsPerPartition / 3;
-
+ final String topic = "brokerFailureTestTopic";
- createTestTopic(topic, parallelism, 2);
-
- DataGenerators.generateRandomizedIntegerSequence(
- StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
- brokerConnectionStrings,
- topic, parallelism, numElementsPerPartition, true);
-
- // find leader to shut down
- ZkClient zkClient = createZookeeperClient();
- PartitionMetadata firstPart = null;
- do {
- if (firstPart != null) {
- LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
- // not the first try. Sleep a bit
- Thread.sleep(150);
- }
+ final int parallelism = 2;
+ final int numElementsPerPartition = 1000;
+ final int totalElements = parallelism * numElementsPerPartition;
+ final int failAfterElements = numElementsPerPartition / 3;
+
+
+ createTestTopic(topic, parallelism, 2);
- Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
- firstPart = partitionMetadata.head();
+ DataGenerators.generateRandomizedIntegerSequence(
+ StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort),
+ brokerConnectionStrings,
+ topic, parallelism, numElementsPerPartition, true);
+
+ // find leader to shut down
+ ZkClient zkClient = createZookeeperClient();
+ PartitionMetadata firstPart = null;
+ do {
+ if (firstPart != null) {
+ LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+ // not the first try. Sleep a bit
+ Thread.sleep(150);
}
- while (firstPart.errorCode() != 0);
- zkClient.close();
- final String leaderToShutDown = firstPart.leader().get().connectionString();
- LOG.info("Leader to shutdown {}", leaderToShutDown);
-
-
- // run the topology that fails and recovers
+ Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata();
+ firstPart = partitionMetadata.head();
+ }
+ while (firstPart.errorCode() != 0);
+ zkClient.close();
- DeserializationSchema<Integer> schema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+ final String leaderToShutDown = firstPart.leader().get().connectionString();
+ final int leaderIdToShutDown = firstPart.leader().get().id();
+ LOG.info("Leader to shutdown {}", leaderToShutDown);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setParallelism(parallelism);
- env.enableCheckpointing(500);
- env.setNumberOfExecutionRetries(3);
- env.getConfig().disableSysoutLogging();
-
- FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+ // run the topology that fails and recovers
- env
- .addSource(kafkaSource)
- .map(new PartitionValidatingMapper(parallelism, 1))
- .map(new BrokerKillingMapper<Integer>(leaderToShutDown, failAfterElements))
- .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+ DeserializationSchema<Integer> schema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
- BrokerKillingMapper.killedLeaderBefore = false;
- tryExecute(env, "One-to-one exactly once test");
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(500);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
- // this cannot be reliably checked, as checkpoints come in time intervals, and
- // failures after a number of elements
-// assertTrue("Job did not do a checkpoint before the failure",
-// BrokerKillingMapper.hasBeenCheckpointedBeforeFailure);
- LOG.info("finished runBrokerFailureTest()");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ FlinkKafkaConsumer<Integer> kafkaSource = getConsumer(topic, schema, standardProps);
+
+ env
+ .addSource(kafkaSource)
+ .map(new PartitionValidatingMapper(parallelism, 1))
+ .map(new BrokerKillingMapper<Integer>(leaderToShutDown, failAfterElements))
+ .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
+
+ BrokerKillingMapper.killedLeaderBefore = false;
+ tryExecute(env, "One-to-one exactly once test");
+
+ // start a new broker:
+ brokers.set(leaderIdToShutDown, getKafkaServer(leaderIdToShutDown, tmpKafkaDirs.get(leaderIdToShutDown), kafkaHost, zookeeperConnectionString));
+
+ LOG.info("finished runBrokerFailureTest()");
}
// ------------------------------------------------------------------------
@@ -961,8 +894,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
LOG.info("Successfully read sequence for verification");
}
- private static void writeSequence(StreamExecutionEnvironment env, String topicName,
- final int numElements, int parallelism) throws Exception {
+ private static void writeSequence(StreamExecutionEnvironment env, String topicName, final int numElements, int parallelism) throws Exception {
TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
@@ -987,9 +919,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}).setParallelism(parallelism);
- stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
- topicName,
- new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(resultType, env.getConfig()),
+ stream.addSink(new FlinkKafkaProducer<>(topicName,
+ new TypeInformationSerializationSchema<>(resultType, env.getConfig()),
+ FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings),
new Tuple2Partitioner(parallelism)
)).setParallelism(parallelism);
@@ -1038,8 +970,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
private static void printTopic(String topicName, ConsumerConfig config,
- DeserializationSchema<?> deserializationSchema,
- int stopAfter) {
+ DeserializationSchema<?> deserializationSchema,
+ int stopAfter) {
List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 0287392..e88d35a 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -23,7 +23,6 @@ import org.junit.Test;
import java.util.Properties;
-
public class KafkaITCase extends KafkaConsumerTestBase {
@Override
@@ -36,65 +35,65 @@ public class KafkaITCase extends KafkaConsumerTestBase {
// ------------------------------------------------------------------------
@Test
- public void testCheckpointing() {
+ public void testCheckpointing() throws Exception {
runCheckpointingTest();
}
@Test
- public void testOffsetInZookeeper() {
+ public void testOffsetInZookeeper() throws Exception {
runOffsetInZookeeperValidationTest();
}
@Test
- public void testConcurrentProducerConsumerTopology() {
+ public void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
// --- canceling / failures ---
@Test
- public void testCancelingEmptyTopic() {
+ public void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
@Test
- public void testCancelingFullTopic() {
+ public void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
@Test
- public void testFailOnDeploy() {
+ public void testFailOnDeploy() throws Exception {
runFailOnDeployTest();
}
// --- source to partition mappings and exactly once ---
@Test
- public void testOneToOneSources() {
+ public void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
@Test
- public void testOneSourceMultiplePartitions() {
+ public void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
@Test
- public void testMultipleSourcesOnePartition() {
+ public void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
@Test
- public void testBrokerFailure() {
+ public void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
@Test
- public void testBigRecordJob() {
+ public void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/563ee9ad/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
index 5903844..5001364 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
@@ -26,11 +26,14 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.junit.Test;
+import java.io.Serializable;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -100,8 +103,7 @@ public class KafkaProducerITCase extends KafkaTestBase {
.setParallelism(1);
// sink partitions into
- stream.addSink(new KafkaSink<Tuple2<Long, String>>(
- brokerConnectionStrings, topic,serSchema, new CustomPartitioner(parallelism)))
+ stream.addSink(new FlinkKafkaProducer<>(topic, serSchema, FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings), new CustomPartitioner(parallelism)))
.setParallelism(parallelism);
// ------ consuming topology ---------
@@ -165,7 +167,7 @@ public class KafkaProducerITCase extends KafkaTestBase {
// ------------------------------------------------------------------------
- public static class CustomPartitioner implements SerializableKafkaPartitioner {
+ public static class CustomPartitioner extends KafkaPartitioner implements Serializable {
private final int expectedPartitions;