You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/06 04:23:52 UTC
[6/7] flink git commit: [FLINK-6996] [kafka] Fix at-least-once
semantic for FlinkKafkaProducer010
[FLINK-6996] [kafka] Fix at-least-once semantic for FlinkKafkaProducer010
This closes #4206.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21ec86f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21ec86f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21ec86f0
Branch: refs/heads/master
Commit: 21ec86f0836e1157ddb7fbf1a0e3c527259461ab
Parents: 10f5069
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Jun 26 11:28:51 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Jul 6 12:11:57 2017 +0800
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaProducer010.java | 17 +-
.../kafka/Kafka010ProducerITCase.java | 8 -
.../kafka/KafkaTestEnvironmentImpl.java | 35 +++
.../connectors/kafka/Kafka08ProducerITCase.java | 12 +-
.../kafka/KafkaTestEnvironmentImpl.java | 34 ++-
.../connectors/kafka/Kafka09ProducerITCase.java | 10 +-
.../kafka/KafkaTestEnvironmentImpl.java | 34 +++
.../connectors/kafka/KafkaConsumerTestBase.java | 15 +-
.../connectors/kafka/KafkaProducerTestBase.java | 253 ++++++++++++++++++-
.../connectors/kafka/KafkaTestEnvironment.java | 14 +
10 files changed, 405 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 805bc4e..3b9dff1 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -65,7 +68,7 @@ import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase
*
* <p>All methods and constructors in this class are marked with the approach they are needed for.
*/
-public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction {
+public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction, CheckpointedFunction {
/**
* Flag controlling whether we are writing the Flink record's timestamp into Kafka.
@@ -411,6 +414,18 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
invokeInternal(element.getValue(), element.getTimestamp());
}
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+ internalProducer.initializeState(context);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+ internalProducer.snapshotState(context);
+ }
+
/**
* Configuration object returned by the writeToKafkaWithTimestamps() call.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
index 64a5a3f..f811893 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
@@ -18,17 +18,9 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.junit.Test;
-
/**
* IT cases for the {@link FlinkKafkaProducer010}.
*/
@SuppressWarnings("serial")
public class Kafka010ProducerITCase extends KafkaProducerTestBase {
-
- @Test
- public void testCustomPartitioning() {
- runCustomPartitioningTest();
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index cb30fbf..c7e793a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -33,8 +33,10 @@ import kafka.server.KafkaServer;
import kafka.utils.SystemTime$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -46,7 +48,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.BindException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -116,6 +121,31 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+ List<ConsumerRecord<K, V>> result = new ArrayList<>();
+ KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
+ consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+
+ while (true) {
+ boolean processedAtLeastOneRecord = false;
+
+ // wait for new records with timeout and break the loop if we didn't get any
+ Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+ while (iterator.hasNext()) {
+ ConsumerRecord<K, V> record = iterator.next();
+ result.add(record);
+ processedAtLeastOneRecord = true;
+ }
+
+ if (!processedAtLeastOneRecord) {
+ break;
+ }
+ }
+
+ return UnmodifiableList.decorate(result);
+ }
+
+ @Override
public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
@@ -130,6 +160,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+ return FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props);
+ }
+
+ @Override
public KafkaOffsetHandler createOffsetHandler() {
return new KafkaOffsetHandlerImpl();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
index 8074765..681fe02 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -18,17 +18,19 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.junit.Test;
-
/**
* IT cases for the {@link FlinkKafkaProducer08}.
*/
@SuppressWarnings("serial")
public class Kafka08ProducerITCase extends KafkaProducerTestBase {
- @Test
- public void testCustomPartitioning() {
- runCustomPartitioningTest();
+ @Override
+ public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+ // TODO: enable this for Kafka 0.8 - now it hangs indefinitely
}
+ @Override
+ public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+ // Disable this test since FlinkKafka08Producer doesn't support custom operator mode
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index b37f848..4791716 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -38,13 +38,17 @@ import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.SystemTime$;
import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.commons.io.FileUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
-
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,8 +57,10 @@ import java.io.IOException;
import java.net.BindException;
import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
@@ -110,6 +116,27 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+ List<ConsumerRecord<K, V>> result = new ArrayList<>();
+ KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
+ consumer.subscribe(new TopicPartition(topic, partition));
+
+ while (true) {
+ Map<String, ConsumerRecords<K, V>> topics = consumer.poll(timeout);
+ if (topics == null || !topics.containsKey(topic)) {
+ break;
+ }
+ List<ConsumerRecord<K, V>> records = topics.get(topic).records(partition);
+ result.addAll(records);
+ if (records.size() == 0) {
+ break;
+ }
+ }
+
+ return UnmodifiableList.decorate(result);
+ }
+
+ @Override
public <T> StreamSink<T> getProducerSink(
String topic,
KeyedSerializationSchema<T> serSchema,
@@ -132,6 +159,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public KafkaOffsetHandler createOffsetHandler() {
return new KafkaOffsetHandlerImpl();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
index fe8a1a5..847f818 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -18,17 +18,13 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.junit.Test;
-
/**
* IT cases for the {@link FlinkKafkaProducer09}.
*/
@SuppressWarnings("serial")
public class Kafka09ProducerITCase extends KafkaProducerTestBase {
-
- @Test
- public void testCustomPartitioning() {
- runCustomPartitioningTest();
+ @Override
+ public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+ // Disable this test since FlinkKafka09Producer doesn't support custom operator mode
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index fc38e24..ab82ef3 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -35,8 +35,10 @@ import kafka.server.KafkaServer;
import kafka.utils.SystemTime$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -47,7 +49,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.BindException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -103,6 +108,30 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+ List<ConsumerRecord<K, V>> result = new ArrayList<>();
+ KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
+ consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+
+ while (true) {
+ boolean processedAtLeastOneRecord = false;
+
+ Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+ while (iterator.hasNext()) {
+ ConsumerRecord<K, V> record = iterator.next();
+ result.add(record);
+ processedAtLeastOneRecord = true;
+ }
+
+ if (!processedAtLeastOneRecord) {
+ break;
+ }
+ }
+
+ return UnmodifiableList.decorate(result);
+ }
+
+ @Override
public <T> StreamSink<T> getProducerSink(
String topic,
KeyedSerializationSchema<T> serSchema,
@@ -121,6 +150,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public KafkaOffsetHandler createOffsetHandler() {
return new KafkaOffsetHandlerImpl();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index c25c4f5..dac45f7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -788,7 +788,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
DataGenerators.generateRandomizedIntegerSequence(
StreamExecutionEnvironment.getExecutionEnvironment(),
kafkaServer,
- topic, parallelism, numElementsPerPartition, true);
+ topic,
+ parallelism,
+ numElementsPerPartition,
+ true);
// run the topology that fails and recovers
@@ -837,7 +840,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
DataGenerators.generateRandomizedIntegerSequence(
StreamExecutionEnvironment.getExecutionEnvironment(),
kafkaServer,
- topic, numPartitions, numElementsPerPartition, false);
+ topic,
+ numPartitions,
+ numElementsPerPartition,
+ true);
// run the topology that fails and recovers
@@ -885,7 +891,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
DataGenerators.generateRandomizedIntegerSequence(
StreamExecutionEnvironment.getExecutionEnvironment(),
kafkaServer,
- topic, numPartitions, numElementsPerPartition, true);
+ topic,
+ numPartitions,
+ numElementsPerPartition,
+ true);
// run the topology that fails and recovers
http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index e292e13..1af9ca8 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -18,26 +18,47 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import kafka.server.KafkaServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Test;
+
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.assertEquals;
@@ -77,7 +98,8 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
* <p>Each topic also has a final sink that validates that there are no duplicates and that all
* partitions are present.
*/
- public void runCustomPartitioningTest() {
+ @Test
+ public void testCustomPartitioning() {
try {
LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
@@ -133,7 +155,9 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
props.putAll(secureProps);
// sink partitions into
- kafkaServer.produceIntoKafka(stream, defaultTopic,
+ kafkaServer.produceIntoKafka(
+ stream,
+ defaultTopic,
// this serialization schema will route between the default topic and dynamic topic
new CustomKeyedSerializationSchemaWrapper(serSchema, defaultTopic, dynamicTopic),
props,
@@ -172,6 +196,145 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
}
}
+ /**
+ * Tests the at-least-once semantic for the simple writes into Kafka.
+ */
+ @Test
+ public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+ testOneToOneAtLeastOnce(true);
+ }
+
+ /**
+ * Tests the at-least-once semantic for the simple writes into Kafka.
+ */
+ @Test
+ public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+ testOneToOneAtLeastOnce(false);
+ }
+
+ /**
+ * This test sets KafkaProducer so that it will not automatically flush the data and
+ * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
+ */
+ protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
+ final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
+ final int partition = 0;
+ final int numElements = 1000;
+ final int failAfterElements = 333;
+
+ createTestTopic(topic, 1, 1);
+
+ TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+ KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(500);
+ env.setParallelism(1);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.getConfig().disableSysoutLogging();
+
+ Properties properties = new Properties();
+ properties.putAll(standardProps);
+ properties.putAll(secureProps);
+ // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
+ properties.setProperty("timeout.ms", "10000");
+ properties.setProperty("max.block.ms", "10000");
+ // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
+ properties.setProperty("batch.size", "10240000");
+ properties.setProperty("linger.ms", "10000");
+
+ int leaderId = kafkaServer.getLeaderToShutDown(topic);
+ BrokerRestartingMapper.resetState();
+
+ // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
+ DataStream<Integer> inputStream = env
+ .fromCollection(getIntegersSequence(numElements))
+ .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
+
+ StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+ @Override
+ public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+ return partition;
+ }
+ });
+
+ if (regularSink) {
+ inputStream.addSink(kafkaSink.getUserFunction());
+ }
+ else {
+ kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+ @Override
+ public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+ return partition;
+ }
+ });
+ }
+
+ FailingIdentityMapper.failedBefore = false;
+ try {
+ env.execute("One-to-one at least once test");
+ fail("Job should fail!");
+ }
+ catch (JobExecutionException ex) {
+ assertEquals("Broker was shutdown!", ex.getCause().getMessage());
+ }
+
+ kafkaServer.restartBroker(leaderId);
+
+ // assert that before failure we successfully snapshot/flushed all expected elements
+ assertAtLeastOnceForTopic(
+ properties,
+ topic,
+ partition,
+ ImmutableSet.copyOf(getIntegersSequence(BrokerRestartingMapper.numElementsBeforeSnapshot)),
+ 30000L);
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.
+ * After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
+ */
+ private void assertAtLeastOnceForTopic(
+ Properties properties,
+ String topic,
+ int partition,
+ Set<Integer> expectedElements,
+ long timeoutMillis) throws Exception {
+
+ long startMillis = System.currentTimeMillis();
+ Set<Integer> actualElements = new HashSet<>();
+
+ // until we timeout...
+ while (System.currentTimeMillis() < startMillis + timeoutMillis) {
+ properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+ properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+
+ // query kafka for new records ...
+ Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100);
+
+ for (ConsumerRecord<Integer, Integer> record : records) {
+ actualElements.add(record.value());
+ }
+
+ // succeed if we got all expectedElements
+ if (actualElements.containsAll(expectedElements)) {
+ return;
+ }
+ }
+
+ fail(String.format("Expected to contain all of: <%s>, but was: <%s>", expectedElements, actualElements));
+ }
+
+ private List<Integer> getIntegersSequence(int size) {
+ List<Integer> result = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ result.add(i);
+ }
+ return result;
+ }
+
// ------------------------------------------------------------------------
private static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
@@ -266,4 +429,90 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
}
}
}
+
+ private static class BrokerRestartingMapper<T> extends RichMapFunction<T, T>
+ implements CheckpointedFunction, CheckpointListener {
+
+ private static final long serialVersionUID = 6334389850158707313L;
+
+ public static volatile boolean restartedLeaderBefore;
+ public static volatile boolean hasBeenCheckpointedBeforeFailure;
+ public static volatile int numElementsBeforeSnapshot;
+
+ private final int shutdownBrokerId;
+ private final int failCount;
+ private int numElementsTotal;
+
+ private boolean failer;
+ private boolean hasBeenCheckpointed;
+
+ public static void resetState() {
+ restartedLeaderBefore = false;
+ hasBeenCheckpointedBeforeFailure = false;
+ numElementsBeforeSnapshot = 0;
+ }
+
+ public BrokerRestartingMapper(int shutdownBrokerId, int failCount) {
+ this.shutdownBrokerId = shutdownBrokerId;
+ this.failCount = failCount;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+ }
+
+ @Override
+ public T map(T value) throws Exception {
+ numElementsTotal++;
+
+ if (!restartedLeaderBefore) {
+ Thread.sleep(10);
+
+ if (failer && numElementsTotal >= failCount) {
+ // shut down a Kafka broker
+ KafkaServer toShutDown = null;
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+
+ if (kafkaServer.getBrokerId(server) == shutdownBrokerId) {
+ toShutDown = server;
+ break;
+ }
+ }
+
+ if (toShutDown == null) {
+ StringBuilder listOfBrokers = new StringBuilder();
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+ listOfBrokers.append(kafkaServer.getBrokerId(server));
+ listOfBrokers.append(" ; ");
+ }
+
+ throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId
+ + " ; available brokers: " + listOfBrokers.toString());
+ } else {
+ hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+ restartedLeaderBefore = true;
+ toShutDown.shutdown();
+ toShutDown.awaitShutdown();
+ throw new Exception("Broker was shutdown!");
+ }
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ hasBeenCheckpointed = true;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ numElementsBeforeSnapshot = numElementsTotal;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 4df3465..50eff23 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -27,7 +27,9 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaW
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import kafka.server.KafkaServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -80,6 +82,12 @@ public abstract class KafkaTestEnvironment {
public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props);
+ public abstract <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(
+ Properties properties,
+ String topic,
+ int partition,
+ long timeout);
+
public abstract <T> StreamSink<T> getProducerSink(String topic,
KeyedSerializationSchema<T> serSchema, Properties props,
FlinkKafkaPartitioner<T> partitioner);
@@ -88,6 +96,12 @@ public abstract class KafkaTestEnvironment {
KeyedSerializationSchema<T> serSchema, Properties props,
FlinkKafkaPartitioner<T> partitioner);
+ public abstract <T> DataStreamSink<T> writeToKafkaWithTimestamps(
+ DataStream<T> stream,
+ String topic,
+ KeyedSerializationSchema<T> serSchema,
+ Properties props);
+
// -- offset handlers
/**