You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/06 04:23:52 UTC

[6/7] flink git commit: [FLINK-6996] [kafka] Fix at-least-once semantic for FlinkKafkaProducer010

[FLINK-6996] [kafka] Fix at-least-once semantic for FlinkKafkaProducer010

This closes #4206.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21ec86f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21ec86f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21ec86f0

Branch: refs/heads/master
Commit: 21ec86f0836e1157ddb7fbf1a0e3c527259461ab
Parents: 10f5069
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Jun 26 11:28:51 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Jul 6 12:11:57 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java |  17 +-
 .../kafka/Kafka010ProducerITCase.java           |   8 -
 .../kafka/KafkaTestEnvironmentImpl.java         |  35 +++
 .../connectors/kafka/Kafka08ProducerITCase.java |  12 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  34 ++-
 .../connectors/kafka/Kafka09ProducerITCase.java |  10 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  34 +++
 .../connectors/kafka/KafkaConsumerTestBase.java |  15 +-
 .../connectors/kafka/KafkaProducerTestBase.java | 253 ++++++++++++++++++-
 .../connectors/kafka/KafkaTestEnvironment.java  |  14 +
 10 files changed, 405 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 805bc4e..3b9dff1 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -65,7 +68,7 @@ import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase
  *
  * <p>All methods and constructors in this class are marked with the approach they are needed for.
  */
-public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction {
+public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction, CheckpointedFunction {
 
 	/**
 	 * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
@@ -411,6 +414,18 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		invokeInternal(element.getValue(), element.getTimestamp());
 	}
 
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		internalProducer.initializeState(context);
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+		internalProducer.snapshotState(context);
+	}
+
 	/**
 	 * Configuration object returned by the writeToKafkaWithTimestamps() call.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
index 64a5a3f..f811893 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
@@ -18,17 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.Test;
-
 /**
  * IT cases for the {@link FlinkKafkaProducer010}.
  */
 @SuppressWarnings("serial")
 public class Kafka010ProducerITCase extends KafkaProducerTestBase {
-
-	@Test
-	public void testCustomPartitioning() {
-		runCustomPartitioningTest();
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index cb30fbf..c7e793a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -33,8 +33,10 @@ import kafka.server.KafkaServer;
 import kafka.utils.SystemTime$;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -46,7 +48,10 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.net.BindException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -116,6 +121,31 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+		List<ConsumerRecord<K, V>> result = new ArrayList<>();
+		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
+		consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+
+		while (true) {
+			boolean processedAtLeastOneRecord = false;
+
+			// wait for new records with timeout and break the loop if we didn't get any
+			Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+			while (iterator.hasNext()) {
+				ConsumerRecord<K, V> record = iterator.next();
+				result.add(record);
+				processedAtLeastOneRecord = true;
+			}
+
+			if (!processedAtLeastOneRecord) {
+				break;
+			}
+		}
+
+		return UnmodifiableList.decorate(result);
+	}
+
+	@Override
 	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
@@ -130,6 +160,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+		return FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props);
+	}
+
+	@Override
 	public KafkaOffsetHandler createOffsetHandler() {
 		return new KafkaOffsetHandlerImpl();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
index 8074765..681fe02 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -18,17 +18,19 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.Test;
-
 /**
  * IT cases for the {@link FlinkKafkaProducer08}.
  */
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
 
-	@Test
-	public void testCustomPartitioning() {
-		runCustomPartitioningTest();
+	@Override
+	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+		// TODO: enable this for Kafka 0.8 - now it hangs indefinitely
 	}
 
+	@Override
+	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+		// Disable this test since FlinkKafka08Producer doesn't support custom operator mode
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index b37f848..4791716 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -38,13 +38,17 @@ import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.SystemTime$;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
-
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,8 +57,10 @@ import java.io.IOException;
 import java.net.BindException;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -110,6 +116,27 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+		List<ConsumerRecord<K, V>> result = new ArrayList<>();
+		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
+		consumer.subscribe(new TopicPartition(topic, partition));
+
+		while (true) {
+			Map<String, ConsumerRecords<K, V>> topics = consumer.poll(timeout);
+			if (topics == null || !topics.containsKey(topic)) {
+				break;
+			}
+			List<ConsumerRecord<K, V>> records = topics.get(topic).records(partition);
+			result.addAll(records);
+			if (records.size() == 0) {
+				break;
+			}
+		}
+
+		return UnmodifiableList.decorate(result);
+	}
+
+	@Override
 	public <T> StreamSink<T> getProducerSink(
 			String topic,
 			KeyedSerializationSchema<T> serSchema,
@@ -132,6 +159,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public KafkaOffsetHandler createOffsetHandler() {
 		return new KafkaOffsetHandlerImpl();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
index fe8a1a5..847f818 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -18,17 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.Test;
-
 /**
  * IT cases for the {@link FlinkKafkaProducer09}.
  */
 @SuppressWarnings("serial")
 public class Kafka09ProducerITCase extends KafkaProducerTestBase {
-
-	@Test
-	public void testCustomPartitioning() {
-		runCustomPartitioningTest();
+	@Override
+	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+		// Disable this test since FlinkKafka09Producer doesn't support custom operator mode
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index fc38e24..ab82ef3 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -35,8 +35,10 @@ import kafka.server.KafkaServer;
 import kafka.utils.SystemTime$;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -47,7 +49,10 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.net.BindException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -103,6 +108,30 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+		List<ConsumerRecord<K, V>> result = new ArrayList<>();
+		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
+		consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+
+		while (true) {
+			boolean processedAtLeastOneRecord = false;
+
+			Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+			while (iterator.hasNext()) {
+				ConsumerRecord<K, V> record = iterator.next();
+				result.add(record);
+				processedAtLeastOneRecord = true;
+			}
+
+			if (!processedAtLeastOneRecord) {
+				break;
+			}
+		}
+
+		return UnmodifiableList.decorate(result);
+	}
+
+	@Override
 	public <T> StreamSink<T> getProducerSink(
 			String topic,
 			KeyedSerializationSchema<T> serSchema,
@@ -121,6 +150,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public KafkaOffsetHandler createOffsetHandler() {
 		return new KafkaOffsetHandlerImpl();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index c25c4f5..dac45f7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -788,7 +788,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		DataGenerators.generateRandomizedIntegerSequence(
 				StreamExecutionEnvironment.getExecutionEnvironment(),
 				kafkaServer,
-				topic, parallelism, numElementsPerPartition, true);
+				topic,
+				parallelism,
+				numElementsPerPartition,
+				true);
 
 		// run the topology that fails and recovers
 
@@ -837,7 +840,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		DataGenerators.generateRandomizedIntegerSequence(
 				StreamExecutionEnvironment.getExecutionEnvironment(),
 				kafkaServer,
-				topic, numPartitions, numElementsPerPartition, false);
+				topic,
+				numPartitions,
+				numElementsPerPartition,
+				true);
 
 		// run the topology that fails and recovers
 
@@ -885,7 +891,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		DataGenerators.generateRandomizedIntegerSequence(
 				StreamExecutionEnvironment.getExecutionEnvironment(),
 				kafkaServer,
-				topic, numPartitions, numElementsPerPartition, true);
+				topic,
+				numPartitions,
+				numElementsPerPartition,
+				true);
 
 		// run the topology that fails and recovers
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index e292e13..1af9ca8 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -18,26 +18,47 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Preconditions;
 
+import com.google.common.collect.ImmutableSet;
+import kafka.server.KafkaServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Test;
+
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
@@ -77,7 +98,8 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 	 * <p>Each topic also has a final sink that validates that there are no duplicates and that all
 	 * partitions are present.
 	 */
-	public void runCustomPartitioningTest() {
+	@Test
+	public void testCustomPartitioning() {
 		try {
 			LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
 
@@ -133,7 +155,9 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 			props.putAll(secureProps);
 
 			// sink partitions into
-			kafkaServer.produceIntoKafka(stream, defaultTopic,
+			kafkaServer.produceIntoKafka(
+					stream,
+					defaultTopic,
 					// this serialization schema will route between the default topic and dynamic topic
 					new CustomKeyedSerializationSchemaWrapper(serSchema, defaultTopic, dynamicTopic),
 					props,
@@ -172,6 +196,145 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 		}
 	}
 
+	/**
+	 * Tests the at-least-once semantic for the simple writes into Kafka.
+	 */
+	@Test
+	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+		testOneToOneAtLeastOnce(true);
+	}
+
+	/**
+	 * Tests the at-least-once semantic for the simple writes into Kafka.
+	 */
+	@Test
+	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+		testOneToOneAtLeastOnce(false);
+	}
+
+	/**
+	 * This test sets KafkaProducer so that it will not automatically flush the data and
+	 * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState.
+	 */
+	protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
+		final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
+		final int partition = 0;
+		final int numElements = 1000;
+		final int failAfterElements = 333;
+
+		createTestTopic(topic, 1, 1);
+
+		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.enableCheckpointing(500);
+		env.setParallelism(1);
+		env.setRestartStrategy(RestartStrategies.noRestart());
+		env.getConfig().disableSysoutLogging();
+
+		Properties properties = new Properties();
+		properties.putAll(standardProps);
+		properties.putAll(secureProps);
+		// decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
+		properties.setProperty("timeout.ms", "10000");
+		properties.setProperty("max.block.ms", "10000");
+		// increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
+		properties.setProperty("batch.size", "10240000");
+		properties.setProperty("linger.ms", "10000");
+
+		int leaderId = kafkaServer.getLeaderToShutDown(topic);
+		BrokerRestartingMapper.resetState();
+
+		// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
+		DataStream<Integer> inputStream = env
+			.fromCollection(getIntegersSequence(numElements))
+			.map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements));
+
+		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+			@Override
+			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+				return partition;
+			}
+		});
+
+		if (regularSink) {
+			inputStream.addSink(kafkaSink.getUserFunction());
+		}
+		else {
+			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+				@Override
+				public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+					return partition;
+				}
+			});
+		}
+
+		FailingIdentityMapper.failedBefore = false;
+		try {
+			env.execute("One-to-one at least once test");
+			fail("Job should fail!");
+		}
+		catch (JobExecutionException ex) {
+			assertEquals("Broker was shutdown!", ex.getCause().getMessage());
+		}
+
+		kafkaServer.restartBroker(leaderId);
+
+		// assert that before failure we successfully snapshot/flushed all expected elements
+		assertAtLeastOnceForTopic(
+				properties,
+				topic,
+				partition,
+				ImmutableSet.copyOf(getIntegersSequence(BrokerRestartingMapper.numElementsBeforeSnapshot)),
+				30000L);
+
+		deleteTestTopic(topic);
+	}
+
+	/**
+	 * We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.
+	 * After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
+	 */
+	private void assertAtLeastOnceForTopic(
+			Properties properties,
+			String topic,
+			int partition,
+			Set<Integer> expectedElements,
+			long timeoutMillis) throws Exception {
+
+		long startMillis = System.currentTimeMillis();
+		Set<Integer> actualElements = new HashSet<>();
+
+		// until we timeout...
+		while (System.currentTimeMillis() < startMillis + timeoutMillis) {
+			properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+			properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+
+			// query kafka for new records ...
+			Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100);
+
+			for (ConsumerRecord<Integer, Integer> record : records) {
+				actualElements.add(record.value());
+			}
+
+			// succeed if we got all expectedElements
+			if (actualElements.containsAll(expectedElements)) {
+				return;
+			}
+		}
+
+		fail(String.format("Expected to contain all of: <%s>, but was: <%s>", expectedElements, actualElements));
+	}
+
+	private List<Integer> getIntegersSequence(int size) {
+		List<Integer> result = new ArrayList<>(size);
+		for (int i = 0; i < size; i++) {
+			result.add(i);
+		}
+		return result;
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
@@ -266,4 +429,90 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 			}
 		}
 	}
+
+	private static class BrokerRestartingMapper<T> extends RichMapFunction<T, T>
+		implements CheckpointedFunction, CheckpointListener {
+
+		private static final long serialVersionUID = 6334389850158707313L;
+
+		public static volatile boolean restartedLeaderBefore;
+		public static volatile boolean hasBeenCheckpointedBeforeFailure;
+		public static volatile int numElementsBeforeSnapshot;
+
+		private final int shutdownBrokerId;
+		private final int failCount;
+		private int numElementsTotal;
+
+		private boolean failer;
+		private boolean hasBeenCheckpointed;
+
+		public static void resetState() {
+			restartedLeaderBefore = false;
+			hasBeenCheckpointedBeforeFailure = false;
+			numElementsBeforeSnapshot = 0;
+		}
+
+		public BrokerRestartingMapper(int shutdownBrokerId, int failCount) {
+			this.shutdownBrokerId = shutdownBrokerId;
+			this.failCount = failCount;
+		}
+
+		@Override
+		public void open(Configuration parameters) {
+			failer = getRuntimeContext().getIndexOfThisSubtask() == 0;
+		}
+
+		@Override
+		public T map(T value) throws Exception {
+			numElementsTotal++;
+
+			if (!restartedLeaderBefore) {
+				Thread.sleep(10);
+
+				if (failer && numElementsTotal >= failCount) {
+					// shut down a Kafka broker
+					KafkaServer toShutDown = null;
+					for (KafkaServer server : kafkaServer.getBrokers()) {
+
+						if (kafkaServer.getBrokerId(server) == shutdownBrokerId) {
+							toShutDown = server;
+							break;
+						}
+					}
+
+					if (toShutDown == null) {
+						StringBuilder listOfBrokers = new StringBuilder();
+						for (KafkaServer server : kafkaServer.getBrokers()) {
+							listOfBrokers.append(kafkaServer.getBrokerId(server));
+							listOfBrokers.append(" ; ");
+						}
+
+						throw new Exception("Cannot find broker to shut down: " + shutdownBrokerId
+												+ " ; available brokers: " + listOfBrokers.toString());
+					} else {
+						hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+						restartedLeaderBefore = true;
+						toShutDown.shutdown();
+						toShutDown.awaitShutdown();
+						throw new Exception("Broker was shutdown!");
+					}
+				}
+			}
+			return value;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) {
+			hasBeenCheckpointed = true;
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) throws Exception {
+			numElementsBeforeSnapshot = numElementsTotal;
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21ec86f0/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 4df3465..50eff23 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -27,7 +27,9 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaW
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 
 import kafka.server.KafkaServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -80,6 +82,12 @@ public abstract class KafkaTestEnvironment {
 
 	public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props);
 
+	public abstract <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(
+			Properties properties,
+			String topic,
+			int partition,
+			long timeout);
+
 	public abstract <T> StreamSink<T> getProducerSink(String topic,
 			KeyedSerializationSchema<T> serSchema, Properties props,
 			FlinkKafkaPartitioner<T> partitioner);
@@ -88,6 +96,12 @@ public abstract class KafkaTestEnvironment {
 														KeyedSerializationSchema<T> serSchema, Properties props,
 														FlinkKafkaPartitioner<T> partitioner);
 
+	public abstract <T> DataStreamSink<T> writeToKafkaWithTimestamps(
+			DataStream<T> stream,
+			String topic,
+			KeyedSerializationSchema<T> serSchema,
+			Properties props);
+
 	// -- offset handlers
 
 	/**