You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/09 16:59:07 UTC

[1/9] flink git commit: [FLINK-6988][kafka] Add Kafka 0.11 tests for scaling down and up again

Repository: flink
Updated Branches:
  refs/heads/master 08bfdae68 -> 2c734508d


[FLINK-6988][kafka] Add Kafka 0.11 tests for scaling down and up again


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c734508
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c734508
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c734508

Branch: refs/heads/master
Commit: 2c734508d7b6a034748e7d60f2f2075cddf156d8
Parents: 4ada50b
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Fri Aug 25 09:47:12 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducer011Tests.java       | 120 +++++++++++++++++++
 1 file changed, 120 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2c734508/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
index dd21bf4..69c3ceb 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
@@ -42,11 +43,17 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
@@ -316,6 +323,119 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase {
 		deleteTestTopic(topic);
 	}
 
+	/**
+	 * Each instance of FlinkKafkaProducer011 uses it's own pool of transactional ids. After the restore from checkpoint
+	 * transactional ids are redistributed across the subtasks. In case of scale down, the surplus transactional ids
+	 * are dropped. In case of scale up, new one are generated (for the new subtasks). This test make sure that sequence
+	 * of scaling down and up again works fine. Especially it checks whether the newly generated ids in scaling up
+	 * do not overlap with ids that were used before scaling down. For example we start with 4 ids and parallelism 4:
+	 * [1], [2], [3], [4] - one assigned per each subtask
+	 * we scale down to parallelism 2:
+	 * [1, 2], [3, 4] - first subtask got id 1 and 2, second got ids 3 and 4
+	 * surplus ids are dropped from the pools and we scale up to parallelism 3:
+	 * [1 or 2], [3 or 4], [???]
+	 * new subtask have to generate new id(s), but he can not use ids that are potentially in use, so it has to generate
+	 * new ones that are greater then 4.
+	 */
+	@Test(timeout = 120_000L)
+	public void testScaleUpAfterScalingDown() throws Exception {
+		String topic = "scale-down-before-first-checkpoint";
+
+		final int parallelism1 = 4;
+		final int parallelism2 = 2;
+		final int parallelism3 = 3;
+		final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3));
+
+		List<OperatorStateHandle> operatorStateHandles = repartitionAndExecute(
+			topic,
+			Collections.emptyList(),
+			parallelism1,
+			maxParallelism,
+			IntStream.range(0, parallelism1).boxed().iterator());
+
+		operatorStateHandles = repartitionAndExecute(
+			topic,
+			operatorStateHandles,
+			parallelism2,
+			maxParallelism,
+			IntStream.range(parallelism1,  parallelism1 + parallelism2).boxed().iterator());
+
+		operatorStateHandles = repartitionAndExecute(
+			topic,
+			operatorStateHandles,
+			parallelism3,
+			maxParallelism,
+			IntStream.range(parallelism1 + parallelism2,  parallelism1 + parallelism2 + parallelism3).boxed().iterator());
+
+		// After each previous repartitionAndExecute call, we are left with some lingering transactions, that would
+		// not allow us to read all committed messages from the topic. Thus we initialize operators from
+		// operatorStateHandles once more, but without any new data. This should terminate all ongoing transactions.
+
+		operatorStateHandles = repartitionAndExecute(
+			topic,
+			operatorStateHandles,
+			1,
+			maxParallelism,
+			Collections.emptyIterator());
+
+		assertExactlyOnceForTopic(
+			createProperties(),
+			topic,
+			0,
+			IntStream.range(0, parallelism1 + parallelism2 + parallelism3).boxed().collect(Collectors.toList()),
+			30_000L);
+		deleteTestTopic(topic);
+	}
+
+	private List<OperatorStateHandle> repartitionAndExecute(
+			String topic,
+			List<OperatorStateHandle> inputStates,
+			int parallelism,
+			int maxParallelism,
+			Iterator<Integer> inputData) throws Exception {
+
+		List<OperatorStateHandle> outputStates = new ArrayList<>();
+		List<OneInputStreamOperatorTestHarness<Integer, Object>> testHarnesses = new ArrayList<>();
+
+		for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) {
+			OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
+				createTestHarness(topic, maxParallelism, parallelism, subtaskIndex);
+			testHarnesses.add(testHarness);
+
+			testHarness.setup();
+
+			testHarness.initializeState(new OperatorStateHandles(
+				0,
+				Collections.emptyList(),
+				Collections.emptyList(),
+				inputStates,
+				Collections.emptyList()));
+			testHarness.open();
+
+			if (inputData.hasNext()) {
+				int nextValue = inputData.next();
+				testHarness.processElement(nextValue, 0);
+				OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+				outputStates.addAll(snapshot.getManagedOperatorState());
+				checkState(snapshot.getRawOperatorState() == null, "Unexpected raw operator state");
+				checkState(snapshot.getManagedKeyedState() == null, "Unexpected managed keyed state");
+				checkState(snapshot.getRawKeyedState() == null, "Unexpected raw keyed state");
+
+				for (int i = 1; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) {
+					testHarness.processElement(-nextValue, 0);
+					testHarness.snapshot(i, 0);
+				}
+			}
+		}
+
+		for (OneInputStreamOperatorTestHarness<Integer, Object> testHarness : testHarnesses) {
+			testHarness.close();
+		}
+
+		return outputStates;
+	}
+
 	@Test
 	public void testRecoverCommittedTransaction() throws Exception {
 		String topic = "flink-kafka-producer-recover-committed-transaction";


[7/9] flink git commit: [FLINK-6988][kafka] Add test for failure before before checkpoint and scaling down

Posted by al...@apache.org.
[FLINK-6988][kafka] Add test for failure before before checkpoint and scaling down


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ada50b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ada50b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ada50b3

Branch: refs/heads/master
Commit: 4ada50b3dd7c4af9735585a8c45eda4de10bb6e5
Parents: 867c012
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Aug 24 14:16:55 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducer011Tests.java       | 114 ++++++++++++++++---
 .../util/OneInputStreamOperatorTestHarness.java |  11 ++
 2 files changed, 108 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ada50b3/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
index 51410da..dd21bf4 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
@@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -258,26 +259,61 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase {
 		deleteTestTopic(topic);
 	}
 
-	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
-		Properties properties = createProperties();
+	/**
+	 * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure,
+	 * which happened before first checkpoint and was followed up by reducing the parallelism.
+	 * If such transactions were left alone lingering it consumers would be unable to read committed records
+	 * that were created after this lingering transaction.
+	 */
+	@Test(timeout = 120_000L)
+	public void testScaleDownBeforeFirstCheckpoint() throws Exception {
+		String topic = "scale-down-before-first-checkpoint";
+
+		List<AutoCloseable> operatorsToClose = new ArrayList<>();
+		int preScaleDownParallelism = Math.max(2, FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR);
+		for (int subtaskIndex = 0; subtaskIndex < preScaleDownParallelism; subtaskIndex++) {
+			OneInputStreamOperatorTestHarness<Integer, Object> preScaleDownOperator = createTestHarness(
+				topic,
+				preScaleDownParallelism,
+				preScaleDownParallelism,
+				subtaskIndex);
+
+			preScaleDownOperator.setup();
+			preScaleDownOperator.open();
+			preScaleDownOperator.processElement(subtaskIndex * 2, 0);
+			preScaleDownOperator.snapshot(0, 1);
+			preScaleDownOperator.processElement(subtaskIndex * 2 + 1, 2);
+
+			operatorsToClose.add(preScaleDownOperator);
+		}
 
-		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
-			topic,
-			integerKeyedSerializationSchema,
-			properties,
-			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+		// do not close previous testHarnesses to make sure that closing do not clean up something (in case of failure
+		// there might not be any close)
 
-		return new OneInputStreamOperatorTestHarness<>(
-			new StreamSink<>(kafkaProducer),
-			IntSerializer.INSTANCE);
-	}
+		// After previous failure simulate restarting application with smaller parallelism
+		OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0);
 
-	private Properties createProperties() {
-		Properties properties = new Properties();
-		properties.putAll(standardProps);
-		properties.putAll(secureProps);
-		properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
-		return properties;
+		postScaleDownOperator1.setup();
+		postScaleDownOperator1.open();
+
+		// write and commit more records, after potentially lingering transactions
+		postScaleDownOperator1.processElement(46, 7);
+		postScaleDownOperator1.snapshot(4, 8);
+		postScaleDownOperator1.processElement(47, 9);
+		postScaleDownOperator1.notifyOfCompletedCheckpoint(4);
+
+		//now we should have:
+		// - records 42, 43, 44 and 45 in aborted transactions
+		// - committed transaction with record 46
+		// - pending transaction with record 47
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(46), 30_000L);
+
+		postScaleDownOperator1.close();
+		// ignore ProducerFencedExceptions, because postScaleDownOperator1 could reuse transactional ids.
+		for (AutoCloseable operatorToClose : operatorsToClose) {
+			closeIgnoringProducerFenced(operatorToClose);
+		}
+		deleteTestTopic(topic);
 	}
 
 	@Test
@@ -363,4 +399,48 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase {
 			assertEquals(expectedValue, record.value());
 		}
 	}
+
+	private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
+		try {
+			autoCloseable.close();
+		}
+		catch (Exception ex) {
+			if (!(ex.getCause() instanceof ProducerFencedException)) {
+				throw ex;
+			}
+		}
+	}
+
+	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
+		return createTestHarness(topic, 1, 1, 0);
+	}
+
+	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(
+		String topic,
+		int maxParallelism,
+		int parallelism,
+		int subtaskIndex) throws Exception {
+		Properties properties = createProperties();
+
+		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+			topic,
+			integerKeyedSerializationSchema,
+			properties,
+			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+
+		return new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			maxParallelism,
+			parallelism,
+			subtaskIndex,
+			IntSerializer.INSTANCE);
+	}
+
+	private Properties createProperties() {
+		Properties properties = new Properties();
+		properties.putAll(standardProps);
+		properties.putAll(secureProps);
+		properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
+		return properties;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4ada50b3/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index c8fa2a4..5c7d986 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -51,6 +51,17 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 
 	public OneInputStreamOperatorTestHarness(
 		OneInputStreamOperator<IN, OUT> operator,
+		int maxParallelism,
+		int parallelism,
+		int subtaskIndex,
+		TypeSerializer<IN> typeSerializerIn) throws Exception {
+		this(operator, maxParallelism, parallelism, subtaskIndex);
+
+		config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
+	}
+
+	public OneInputStreamOperatorTestHarness(
+		OneInputStreamOperator<IN, OUT> operator,
 		TypeSerializer<IN> typeSerializerIn,
 		Environment environment) throws Exception {
 		this(operator, environment);


[5/9] flink git commit: [hotfix] Don't use deprecated writeWithTimestamps in Kafka 0.10 tests

Posted by al...@apache.org.
[hotfix] Don't use deprecated writeWithTimestamps in Kafka 0.10 tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a3621b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a3621b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a3621b8

Branch: refs/heads/master
Commit: 9a3621b842d2bf6b76e394f1412dd27475180ac2
Parents: 08bfdae
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 28 14:53:24 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../streaming/connectors/kafka/KafkaTestEnvironmentImpl.java    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9a3621b8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 5a5caad..d0e935b 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -168,7 +168,10 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 	@Override
 	public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
-		return FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props);
+		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props);
+		prod.setFlushOnCheckpoint(true);
+		prod.setWriteTimestampToKafka(true);
+		return stream.addSink(prod);
 	}
 
 	@Override


[2/9] flink git commit: [hotfix][streaming] Fix typo in parameter and unify naming in test harnesses

Posted by al...@apache.org.
[hotfix][streaming] Fix typo in parameter and unify naming in test harnesses


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/867c0124
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/867c0124
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/867c0124

Branch: refs/heads/master
Commit: 867c0124e2959ea3c90dab13cc12ba43c2eb0f64
Parents: 2f651e9
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Aug 24 13:16:14 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../flink/streaming/util/AbstractStreamOperatorTestHarness.java  | 4 ++--
 .../flink/streaming/util/OneInputStreamOperatorTestHarness.java  | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/867c0124/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 793e8f6..3d1b6fd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -120,7 +120,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 	public AbstractStreamOperatorTestHarness(
 		StreamOperator<OUT> operator,
 		int maxParallelism,
-		int numSubtasks,
+		int parallelism,
 		int subtaskIndex) throws Exception {
 
 		this(
@@ -133,7 +133,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 				new Configuration(),
 				new ExecutionConfig(),
 				maxParallelism,
-				numSubtasks,
+				parallelism,
 				subtaskIndex));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/867c0124/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 8a0996f..c8fa2a4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -65,9 +65,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 	public OneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
 			int maxParallelism,
-			int numTubtasks,
+			int parallelism,
 			int subtaskIndex) throws Exception {
-		super(operator, maxParallelism, numTubtasks, subtaskIndex);
+		super(operator, maxParallelism, parallelism, subtaskIndex);
 
 		this.oneInputOperator = operator;
 	}


[4/9] flink git commit: [FLINK-6988] Add Kafka 0.11 connector maven module

Posted by al...@apache.org.
[FLINK-6988] Add Kafka 0.11 connector maven module


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a35c356
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a35c356
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a35c356

Branch: refs/heads/master
Commit: 7a35c35610815f01e89ed340b4f116c950046c20
Parents: 49cef0c
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Sep 6 16:42:59 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../flink-connector-kafka-0.11/pom.xml          | 213 +++++++++++++++++++
 .../src/main/resources/log4j.properties         |  28 +++
 .../src/test/resources/log4j-test.properties    |  30 +++
 flink-connectors/pom.xml                        |  12 ++
 tools/travis_mvn_watchdog.sh                    |   4 +
 5 files changed, 287 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a35c356/flink-connectors/flink-connector-kafka-0.11/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml b/flink-connectors/flink-connector-kafka-0.11/pom.xml
new file mode 100644
index 0000000..c41f697
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml
@@ -0,0 +1,213 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
+	<name>flink-connector-kafka-0.11</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<kafka.version>0.11.0.0</kafka.version>
+	</properties>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.apache.kafka</groupId>
+					<artifactId>kafka_${scala.binary.version}</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<!-- streaming-java dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Add Kafka 0.11.x as a dependency -->
+
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<version>${kafka.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project,
+			won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-0.9_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<!-- exclude Kafka dependencies -->
+				<exclusion>
+					<groupId>org.apache.kafka</groupId>
+					<artifactId>kafka_${scala.binary.version}</artifactId>
+				</exclusion>
+			</exclusions>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<!-- exclude Kafka dependencies -->
+				<exclusion>
+					<groupId>org.apache.kafka</groupId>
+					<artifactId>kafka_${scala.binary.version}</artifactId>
+				</exclusion>
+			</exclusions>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<!-- include 0.11 server for tests  -->
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_${scala.binary.version}</artifactId>
+			<version>${kafka.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-jmx</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+						<configuration>
+							<includes>
+								<include>**/KafkaTestEnvironmentImpl*</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-source-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>attach-test-sources</id>
+						<goals>
+							<goal>test-jar-no-fork</goal>
+						</goals>
+						<configuration>
+							<includes>
+								<include>**/KafkaTestEnvironmentImpl*</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
+					<forkCount>1</forkCount>
+					<argLine>-Xms256m -Xmx2048m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/7a35c356/flink-connectors/flink-connector-kafka-0.11/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/log4j.properties b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6eef174
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/log4j.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7a35c356/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..fbeb110
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+log4j.logger.org.apache.zookeeper=OFF, testlogger
+log4j.logger.state.change.logger=OFF, testlogger
+log4j.logger.kafka=OFF, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/7a35c356/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index bc3f82f..97c9f20 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -75,6 +75,18 @@ under the License.
 
 	<!-- See main pom.xml for explanation of profiles -->
 	<profiles>
+		<!-- Kafka 0.11 does not support scala 2.10-->
+		<profile>
+			<id>scala-2.11</id>
+			<activation>
+				<property>
+					<name>!scala-2.10</name>
+				</property>
+			</activation>
+			<modules>
+				<module>flink-connector-kafka-0.11</module>
+			</modules>
+		</profile>
 		<!--
 			We include the kinesis module only optionally because it contains a dependency
 			licenced under the "Amazon Software License".

http://git-wip-us.apache.org/repos/asf/flink/blob/7a35c356/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index 3ecc268..6808e97 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -93,6 +93,10 @@ flink-connectors/flink-connector-twitter"
 MODULES_TESTS="\
 flink-tests"
 
+if [[ $PROFILE != *"scala-2.10"* ]]; then
+	MODULES_CONNECTORS="$MODULES_CONNECTORS,flink-connectors/flink-connector-kafka-0.11"
+fi
+
 if [[ $PROFILE == *"include-kinesis"* ]]; then
 	case $TEST in
 		(connectors)


[8/9] flink git commit: [FLINK-6988][kafka] Add flink-connector-kafka-0.11 with exactly-once semantic

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
new file mode 100644
index 0000000..6d259fa
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * IT cases for Kafka 0.11 .
+ */
+public class Kafka011ITCase extends KafkaConsumerTestBase {
+
+	@BeforeClass
+	public static void prepare() throws ClassNotFoundException {
+		KafkaProducerTestBase.prepare();
+		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Suite of Tests
+	// ------------------------------------------------------------------------
+
+	@Test(timeout = 60000)
+	public void testFailOnNoBroker() throws Exception {
+		runFailOnNoBrokerTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testConcurrentProducerConsumerTopology() throws Exception {
+		runSimpleConcurrentProducerConsumerTopology();
+	}
+
+	@Test(timeout = 60000)
+	public void testKeyValueSupport() throws Exception {
+		runKeyValueTest();
+	}
+
+	// --- canceling / failures ---
+
+	@Test(timeout = 60000)
+	public void testCancelingEmptyTopic() throws Exception {
+		runCancelingOnEmptyInputTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testCancelingFullTopic() throws Exception {
+		runCancelingOnFullInputTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testFailOnDeploy() throws Exception {
+		runFailOnDeployTest();
+	}
+
+	// --- source to partition mappings and exactly once ---
+
+	@Test(timeout = 60000)
+	public void testOneToOneSources() throws Exception {
+		runOneToOneExactlyOnceTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testOneSourceMultiplePartitions() throws Exception {
+		runOneSourceMultiplePartitionsExactlyOnceTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMultipleSourcesOnePartition() throws Exception {
+		runMultipleSourcesOnePartitionExactlyOnceTest();
+	}
+
+	// --- broker failure ---
+
+	@Test(timeout = 60000)
+	public void testBrokerFailure() throws Exception {
+		runBrokerFailureTest();
+	}
+
+	// --- special executions ---
+
+	@Test(timeout = 60000)
+	public void testBigRecordJob() throws Exception {
+		runBigRecordTestTopology();
+	}
+
+	@Test(timeout = 60000)
+	public void testMultipleTopics() throws Exception {
+		runProduceConsumeMultipleTopics();
+	}
+
+	@Test(timeout = 60000)
+	public void testAllDeletes() throws Exception {
+		runAllDeletesTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMetricsAndEndOfStream() throws Exception {
+		runEndOfStreamTest();
+	}
+
+	// --- startup mode ---
+
+	@Test(timeout = 60000)
+	public void testStartFromEarliestOffsets() throws Exception {
+		runStartFromEarliestOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromLatestOffsets() throws Exception {
+		runStartFromLatestOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromGroupOffsets() throws Exception {
+		runStartFromGroupOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromSpecificOffsets() throws Exception {
+		runStartFromSpecificOffsets();
+	}
+
+	// --- offset committing ---
+
+	@Test(timeout = 60000)
+	public void testCommitOffsetsToKafka() throws Exception {
+		runCommitOffsetsToKafka();
+	}
+
+	@Test(timeout = 60000)
+	public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+		runAutoOffsetRetrievalAndCommitToKafka();
+	}
+
+	/**
+	 * Kafka 0.11 specific test, ensuring Timestamps are properly written to and read from Kafka.
+	 */
+	@Test(timeout = 60000)
+	public void testTimestamps() throws Exception {
+
+		final String topic = "tstopic";
+		createTestTopic(topic, 3, 1);
+
+		// ---------- Produce an event time stream into Kafka -------------------
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() {
+			private static final long serialVersionUID = -2255115836471289626L;
+			boolean running = true;
+
+			@Override
+			public void run(SourceContext<Long> ctx) throws Exception {
+				long i = 0;
+				while (running) {
+					ctx.collectWithTimestamp(i, i * 2);
+					if (i++ == 1110L) {
+						running = false;
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
+
+		final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
+		FlinkKafkaProducer011<Long> prod = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, Optional.of(new FlinkKafkaPartitioner<Long>() {
+			private static final long serialVersionUID = -6730989584364230617L;
+
+			@Override
+			public int partition(Long next, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+				return (int) (next % 3);
+			}
+		}));
+		prod.setWriteTimestampToKafka(true);
+
+		streamWithTimestamps.addSink(prod).setParallelism(3);
+
+		env.execute("Produce some");
+
+		// ---------- Consume stream from Kafka -------------------
+
+		env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		FlinkKafkaConsumer011<Long> kafkaSource = new FlinkKafkaConsumer011<>(topic, new LimitedLongDeserializer(), standardProps);
+		kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() {
+			private static final long serialVersionUID = -4834111173247835189L;
+
+			@Nullable
+			@Override
+			public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
+				if (lastElement % 11 == 0) {
+					return new Watermark(lastElement);
+				}
+				return null;
+			}
+
+			@Override
+			public long extractTimestamp(Long element, long previousElementTimestamp) {
+				return previousElementTimestamp;
+			}
+		});
+
+		DataStream<Long> stream = env.addSource(kafkaSource);
+		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+		stream.transform("timestamp validating operator", objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1);
+
+		env.execute("Consume again");
+
+		deleteTestTopic(topic);
+	}
+
+	private static class TimestampValidatingOperator extends StreamSink<Long> {
+
+		private static final long serialVersionUID = 1353168781235526806L;
+
+		public TimestampValidatingOperator() {
+			super(new SinkFunction<Long>() {
+				private static final long serialVersionUID = -6676565693361786524L;
+
+				@Override
+				public void invoke(Long value) throws Exception {
+					throw new RuntimeException("Unexpected");
+				}
+			});
+		}
+
+		long elCount = 0;
+		long wmCount = 0;
+		long lastWM = Long.MIN_VALUE;
+
+		@Override
+		public void processElement(StreamRecord<Long> element) throws Exception {
+			elCount++;
+			if (element.getValue() * 2 != element.getTimestamp()) {
+				throw new RuntimeException("Invalid timestamp: " + element);
+			}
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			wmCount++;
+
+			if (lastWM <= mark.getTimestamp()) {
+				lastWM = mark.getTimestamp();
+			} else {
+				throw new RuntimeException("Received watermark higher than the last one");
+			}
+
+			if (mark.getTimestamp() % 11 != 0 && mark.getTimestamp() != Long.MAX_VALUE) {
+				throw new RuntimeException("Invalid watermark: " + mark.getTimestamp());
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if (elCount != 1110L) {
+				throw new RuntimeException("Wrong final element count " + elCount);
+			}
+
+			if (wmCount <= 2) {
+				throw new RuntimeException("Almost no watermarks have been sent " + wmCount);
+			}
+		}
+	}
+
+	private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> {
+
+		private static final long serialVersionUID = 6966177118923713521L;
+		private final TypeInformation<Long> ti;
+		private final TypeSerializer<Long> ser;
+		long cnt = 0;
+
+		public LimitedLongDeserializer() {
+			this.ti = TypeInfoParser.parse("Long");
+			this.ser = ti.createSerializer(new ExecutionConfig());
+		}
+
+		@Override
+		public TypeInformation<Long> getProducedType() {
+			return ti;
+		}
+
+		@Override
+		public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+			cnt++;
+			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+			Long e = ser.deserialize(in);
+			return e;
+		}
+
+		@Override
+		public boolean isEndOfStream(Long nextElement) {
+			return cnt > 1110L;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
new file mode 100644
index 0000000..c2e256c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka011JsonTableSource}.
+ */
+public class Kafka011JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+	@Override
+	protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
+		return new Kafka011JsonTableSource(topic, properties, typeInfo);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+		return (Class) JsonRowDeserializationSchema.class;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+		return (Class) FlinkKafkaConsumer011.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
new file mode 100644
index 0000000..ad63662
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.BeforeClass;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer011}.
+ */
+@SuppressWarnings("serial")
+public class Kafka011ProducerAtLeastOnceITCase extends KafkaProducerTestBase {
+
+	@BeforeClass
+	public static void prepare() throws ClassNotFoundException {
+		KafkaProducerTestBase.prepare();
+		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
+	}
+
+	@Override
+	public void testExactlyOnceRegularSink() throws Exception {
+		// disable test for at least once semantic
+	}
+
+	@Override
+	public void testExactlyOnceCustomOperator() throws Exception {
+		// disable test for at least once semantic
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
new file mode 100644
index 0000000..1167238
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.BeforeClass;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer011}.
+ */
+@SuppressWarnings("serial")
+public class Kafka011ProducerExactlyOnceITCase extends KafkaProducerTestBase {
+	@BeforeClass
+	public static void prepare() throws ClassNotFoundException {
+		KafkaProducerTestBase.prepare();
+		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+	}
+
+	@Override
+	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+		// TODO: fix this test
+		// currently very often (~50% cases) KafkaProducer live locks itself on commitTransaction call.
+		// Somehow Kafka 0.11 doesn't play along with NetworkFailureProxy. This can either mean a bug in Kafka
+		// that it doesn't work well with some weird network failures, or the NetworkFailureProxy is a broken design
+		// and this test should be reimplemented in completely different way...
+	}
+
+	@Override
+	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+		// TODO: fix this test
+		// currently very often (~50% cases) KafkaProducer live locks itself on commitTransaction call.
+		// Somehow Kafka 0.11 doesn't play along with NetworkFailureProxy. This can either mean a bug in Kafka
+		// that it doesn't work well with some weird network failures, or the NetworkFailureProxy is a broken design
+		// and this test should be reimplemented in completely different way...
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 0000000..e81148b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.networking.NetworkFailuresProxy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+
+import kafka.admin.AdminUtils;
+import kafka.common.KafkaException;
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+
+import scala.collection.mutable.ArraySeq;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 0.11 .
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+	private File tmpZkDir;
+	private File tmpKafkaParent;
+	private List<File> tmpKafkaDirs;
+	private List<KafkaServer> brokers;
+	private TestingServer zookeeper;
+	private String zookeeperConnectionString;
+	private String brokerConnectionString = "";
+	private Properties standardProps;
+	private FlinkKafkaProducer011.Semantic producerSemantic = FlinkKafkaProducer011.Semantic.EXACTLY_ONCE;
+	// 6 seconds is default. Seems to be too small for travis. 30 seconds
+	private int zkTimeout = 30000;
+	private Config config;
+
+	public String getBrokerConnectionString() {
+		return brokerConnectionString;
+	}
+
+	public void setProducerSemantic(FlinkKafkaProducer011.Semantic producerSemantic) {
+		this.producerSemantic = producerSemantic;
+	}
+
+	@Override
+	public Properties getStandardProperties() {
+		return standardProps;
+	}
+
+	@Override
+	public Properties getSecureProperties() {
+		Properties prop = new Properties();
+		if (config.isSecureMode()) {
+			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+			prop.put("security.protocol", "SASL_PLAINTEXT");
+			prop.put("sasl.kerberos.service.name", "kafka");
+
+			//add special timeout for Travis
+			prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
+			prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
+			prop.setProperty("metadata.fetch.timeout.ms", "120000");
+		}
+		return prop;
+	}
+
+	@Override
+	public String getVersion() {
+		return "0.11";
+	}
+
+	@Override
+	public List<KafkaServer> getBrokers() {
+		return brokers;
+	}
+
+	@Override
+	public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
+		return new FlinkKafkaConsumer011<>(topics, readSchema, props);
+	}
+
+	@Override
+	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+		List<ConsumerRecord<K, V>> result = new ArrayList<>();
+
+		try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
+			consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+
+			while (true) {
+				boolean processedAtLeastOneRecord = false;
+
+				// wait for new records with timeout and break the loop if we didn't get any
+				Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+				while (iterator.hasNext()) {
+					ConsumerRecord<K, V> record = iterator.next();
+					result.add(record);
+					processedAtLeastOneRecord = true;
+				}
+
+				if (!processedAtLeastOneRecord) {
+					break;
+				}
+			}
+			consumer.commitSync();
+		}
+
+		return UnmodifiableList.decorate(result);
+	}
+
+	@Override
+	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
+		return new StreamSink<>(new FlinkKafkaProducer011<>(
+			topic,
+			serSchema,
+			props,
+			Optional.ofNullable(partitioner),
+			producerSemantic,
+			FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+	}
+
+	@Override
+	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
+		return stream.addSink(new FlinkKafkaProducer011<>(
+			topic,
+			serSchema,
+			props,
+			Optional.ofNullable(partitioner),
+			producerSemantic,
+			FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+	}
+
+	@Override
+	public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+		FlinkKafkaProducer011<T> prod = new FlinkKafkaProducer011<>(
+			topic, serSchema, props, Optional.of(new FlinkFixedPartitioner<>()), producerSemantic, FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+
+		prod.setWriteTimestampToKafka(true);
+
+		return stream.addSink(prod);
+	}
+
+	@Override
+	public KafkaOffsetHandler createOffsetHandler() {
+		return new KafkaOffsetHandlerImpl();
+	}
+
+	@Override
+	public void restartBroker(int leaderId) throws Exception {
+		brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
+	}
+
+	@Override
+	public int getLeaderToShutDown(String topic) throws Exception {
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			MetadataResponse.PartitionMetadata firstPart = null;
+			do {
+				if (firstPart != null) {
+					LOG.info("Unable to find leader. error code {}", firstPart.error().code());
+					// not the first try. Sleep a bit
+					Thread.sleep(150);
+				}
+
+				List<MetadataResponse.PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionMetadata();
+				firstPart = partitionMetadata.get(0);
+			}
+			while (firstPart.error().code() != 0);
+
+			return firstPart.leader().id();
+		} finally {
+			zkUtils.close();
+		}
+	}
+
+	@Override
+	public int getBrokerId(KafkaServer server) {
+		return server.config().brokerId();
+	}
+
+	@Override
+	public boolean isSecureRunSupported() {
+		return true;
+	}
+
+	@Override
+	public void prepare(Config config) {
+		//increase the timeout since in Travis ZK connection takes long time for secure connection.
+		if (config.isSecureMode()) {
+			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
+			config.setKafkaServersNumber(1);
+			zkTimeout = zkTimeout * 15;
+		}
+		this.config = config;
+
+		File tempDir = new File(System.getProperty("java.io.tmpdir"));
+		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
+
+		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
+
+		tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			File tmpDir = new File(tmpKafkaParent, "server-" + i);
+			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
+			tmpKafkaDirs.add(tmpDir);
+		}
+
+		zookeeper = null;
+		brokers = null;
+
+		try {
+			zookeeper = new TestingServer(-1, tmpZkDir);
+			zookeeperConnectionString = zookeeper.getConnectString();
+			LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
+
+			LOG.info("Starting KafkaServer");
+			brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+			ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+				KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+				brokers.add(kafkaServer);
+				brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
+				brokerConnectionString +=  ",";
+			}
+
+			LOG.info("ZK and KafkaServer started.");
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			fail("Test setup failed: " + t.getMessage());
+		}
+
+		standardProps = new Properties();
+		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
+		standardProps.setProperty("group.id", "flink-tests");
+		standardProps.setProperty("enable.auto.commit", "false");
+		standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
+		standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
+		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.11 value)
+		standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+	}
+
+	@Override
+	public void shutdown() {
+		for (KafkaServer broker : brokers) {
+			if (broker != null) {
+				broker.shutdown();
+			}
+		}
+		brokers.clear();
+
+		if (zookeeper != null) {
+			try {
+				zookeeper.stop();
+			}
+			catch (Exception e) {
+				LOG.warn("ZK.stop() failed", e);
+			}
+			zookeeper = null;
+		}
+
+		// clean up the temp spaces
+
+		if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+			try {
+				FileUtils.deleteDirectory(tmpKafkaParent);
+			}
+			catch (Exception e) {
+				// ignore
+			}
+		}
+		if (tmpZkDir != null && tmpZkDir.exists()) {
+			try {
+				FileUtils.deleteDirectory(tmpZkDir);
+			}
+			catch (Exception e) {
+				// ignore
+			}
+		}
+	}
+
+	public ZkUtils getZkUtils() {
+		ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
+		return ZkUtils.apply(creator, false);
+	}
+
+	@Override
+	public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
+		// create topic with one client
+		LOG.info("Creating topic {}", topic);
+
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, kafka.admin.RackAwareMode.Enforced$.MODULE$);
+		} finally {
+			zkUtils.close();
+		}
+
+		// validate that the topic has been created
+		final long deadline = System.nanoTime() + 30_000_000_000L;
+		do {
+			try {
+				if (config.isSecureMode()) {
+					//increase wait time since in Travis ZK timeout occurs frequently
+					int wait = zkTimeout / 100;
+					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
+					Thread.sleep(wait);
+				} else {
+					Thread.sleep(100);
+				}
+			} catch (InterruptedException e) {
+				// restore interrupted state
+			}
+			// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
+			// not always correct.
+
+			// create a new ZK utils connection
+			ZkUtils checkZKConn = getZkUtils();
+			if (AdminUtils.topicExists(checkZKConn, topic)) {
+				checkZKConn.close();
+				return;
+			}
+			checkZKConn.close();
+		}
+		while (System.nanoTime() < deadline);
+		fail("Test topic could not be created");
+	}
+
+	@Override
+	public void deleteTestTopic(String topic) {
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			LOG.info("Deleting topic {}", topic);
+
+			ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
+
+			AdminUtils.deleteTopic(zkUtils, topic);
+
+			zk.close();
+		} finally {
+			zkUtils.close();
+		}
+	}
+
+	/**
+	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed).
+	 */
+	protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
+		Properties kafkaProperties = new Properties();
+
+		// properties have to be Strings
+		kafkaProperties.put("advertised.host.name", KAFKA_HOST);
+		kafkaProperties.put("broker.id", Integer.toString(brokerId));
+		kafkaProperties.put("log.dir", tmpFolder.toString());
+		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+		kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
+		kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
+		kafkaProperties.put("transaction.max.timeout.ms", Integer.toString(1000 * 60 * 60 * 2)); // 2hours
+
+		// for CI stability, increase zookeeper session timeout
+		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
+		if (config.getKafkaServerProperties() != null) {
+			kafkaProperties.putAll(config.getKafkaServerProperties());
+		}
+
+		final int numTries = 5;
+
+		for (int i = 1; i <= numTries; i++) {
+			int kafkaPort = NetUtils.getAvailablePort();
+			kafkaProperties.put("port", Integer.toString(kafkaPort));
+
+			if (config.isHideKafkaBehindProxy()) {
+				NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, kafkaPort);
+				kafkaProperties.put("advertised.port", proxy.getLocalPort());
+			}
+
+			//to support secure kafka cluster
+			if (config.isSecureMode()) {
+				LOG.info("Adding Kafka secure configurations");
+				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.putAll(getSecureProperties());
+			}
+
+			KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+			try {
+				scala.Option<String> stringNone = scala.Option.apply(null);
+				KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new ArraySeq<KafkaMetricsReporter>(0));
+				server.startup();
+				return server;
+			}
+			catch (KafkaException e) {
+				if (e.getCause() instanceof BindException) {
+					// port conflict, retry...
+					LOG.info("Port conflict when starting Kafka Broker. Retrying...");
+				}
+				else {
+					throw e;
+				}
+			}
+		}
+
+		throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
+	}
+
+	private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+		private final KafkaConsumer<byte[], byte[]> offsetClient;
+
+		public KafkaOffsetHandlerImpl() {
+			Properties props = new Properties();
+			props.putAll(standardProps);
+			props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+			props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+			offsetClient = new KafkaConsumer<>(props);
+		}
+
+		@Override
+		public Long getCommittedOffset(String topicName, int partition) {
+			OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition));
+			return (committed != null) ? committed.offset() : null;
+		}
+
+		@Override
+		public void setCommittedOffset(String topicName, int partition, long offset) {
+			Map<TopicPartition, OffsetAndMetadata> partitionAndOffset = new HashMap<>();
+			partitionAndOffset.put(new TopicPartition(topicName, partition), new OffsetAndMetadata(offset));
+			offsetClient.commitSync(partitionAndOffset);
+		}
+
+		@Override
+		public void close() {
+			offsetClient.close();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
index 681fe02..c3c9c07 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -23,6 +23,15 @@ package org.apache.flink.streaming.connectors.kafka;
  */
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
+	@Override
+	public void testExactlyOnceRegularSink() throws Exception {
+		// Kafka08 does not support exactly once semantic
+	}
+
+	@Override
+	public void testExactlyOnceCustomOperator() throws Exception {
+		// Kafka08 does not support exactly once semantic
+	}
 
 	@Override
 	public void testOneToOneAtLeastOnceRegularSink() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
index 847f818..b34132f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -24,6 +24,16 @@ package org.apache.flink.streaming.connectors.kafka;
 @SuppressWarnings("serial")
 public class Kafka09ProducerITCase extends KafkaProducerTestBase {
 	@Override
+	public void testExactlyOnceRegularSink() throws Exception {
+		// Kafka08 does not support exactly once semantic
+	}
+
+	@Override
+	public void testExactlyOnceCustomOperator() throws Exception {
+		// Kafka08 does not support exactly once semantic
+	}
+
+	@Override
 	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
 		// Disable this test since FlinkKafka09Producer doesn't support custom operator mode
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index fda6832..e9a0331 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -174,7 +174,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			stream.print();
 			see.execute("No broker test");
 		} catch (JobExecutionException jee) {
-			if (kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) {
+			if (kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10") || kafkaServer.getVersion().equals("0.11")) {
 				assertTrue(jee.getCause() instanceof TimeoutException);
 
 				TimeoutException te = (TimeoutException) jee.getCause();

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 35607dd..e1ba074 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -38,26 +38,25 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.Test;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
@@ -295,38 +294,79 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.
-	 * After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
+	 * Tests the exactly-once semantic for the simple writes into Kafka.
 	 */
-	private void assertAtLeastOnceForTopic(
-			Properties properties,
-			String topic,
-			int partition,
-			Set<Integer> expectedElements,
-			long timeoutMillis) throws Exception {
-
-		long startMillis = System.currentTimeMillis();
-		Set<Integer> actualElements = new HashSet<>();
-
-		// until we timeout...
-		while (System.currentTimeMillis() < startMillis + timeoutMillis) {
-			properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
-			properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
-
-			// query kafka for new records ...
-			Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100);
-
-			for (ConsumerRecord<Integer, Integer> record : records) {
-				actualElements.add(record.value());
-			}
+	@Test
+	public void testExactlyOnceRegularSink() throws Exception {
+		testExactlyOnce(true);
+	}
+
+	/**
+	 * Tests the exactly-once semantic for the simple writes into Kafka.
+	 */
+	@Test
+	public void testExactlyOnceCustomOperator() throws Exception {
+		testExactlyOnce(false);
+	}
+
+	/**
+	 * This test sets KafkaProducer so that it will  automatically flush the data and
+	 * and fails the broker to check whether flushed records since last checkpoint were not duplicated.
+	 */
+	protected void testExactlyOnce(boolean regularSink) throws Exception {
+		final String topic = regularSink ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator";
+		final int partition = 0;
+		final int numElements = 1000;
+		final int failAfterElements = 333;
+
+		createTestTopic(topic, 1, 1);
+
+		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.enableCheckpointing(500);
+		env.setParallelism(1);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+		env.getConfig().disableSysoutLogging();
+
+		Properties properties = new Properties();
+		properties.putAll(standardProps);
+		properties.putAll(secureProps);
 
-			// succeed if we got all expectedElements
-			if (actualElements.containsAll(expectedElements)) {
-				return;
+		// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
+		List<Integer> expectedElements = getIntegersSequence(numElements);
+
+		DataStream<Integer> inputStream = env
+			.addSource(new IntegerSource(numElements))
+			.map(new FailingIdentityMapper<Integer>(failAfterElements));
+
+		FlinkKafkaPartitioner<Integer> partitioner = new FlinkKafkaPartitioner<Integer>() {
+			@Override
+			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+				return partition;
 			}
+		};
+		if (regularSink) {
+			StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, partitioner);
+			inputStream.addSink(kafkaSink.getUserFunction());
+		}
+		else {
+			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, partitioner);
 		}
 
-		fail(String.format("Expected to contain all of: <%s>, but was: <%s>", expectedElements, actualElements));
+		FailingIdentityMapper.failedBefore = false;
+		TestUtils.tryExecute(env, "Exactly once test");
+
+		// assert that before failure we successfully snapshot/flushed all expected elements
+		assertExactlyOnceForTopic(
+			properties,
+			topic,
+			partition,
+			expectedElements,
+			30000L);
+
+		deleteTestTopic(topic);
 	}
 
 	private List<Integer> getIntegersSequence(int size) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index f8792e5..fcdb59b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -39,11 +40,18 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.FiniteDuration;
 
+import static org.junit.Assert.fail;
+
 /**
  * The base for the Kafka tests. It brings up:
  * <ul>
@@ -209,4 +217,80 @@ public abstract class KafkaTestBase extends TestLogger {
 		kafkaServer.deleteTestTopic(topic);
 	}
 
+	/**
+	 * We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.
+	 * After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
+	 */
+	protected void assertAtLeastOnceForTopic(
+			Properties properties,
+			String topic,
+			int partition,
+			Set<Integer> expectedElements,
+			long timeoutMillis) throws Exception {
+
+		long startMillis = System.currentTimeMillis();
+		Set<Integer> actualElements = new HashSet<>();
+
+		// until we timeout...
+		while (System.currentTimeMillis() < startMillis + timeoutMillis) {
+			properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+			properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+
+			// query kafka for new records ...
+			Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100);
+
+			for (ConsumerRecord<Integer, Integer> record : records) {
+				actualElements.add(record.value());
+			}
+
+			// succeed if we got all expectedElements
+			if (actualElements.containsAll(expectedElements)) {
+				return;
+			}
+		}
+
+		fail(String.format("Expected to contain all of: <%s>, but was: <%s>", expectedElements, actualElements));
+	}
+
+	/**
+	 * We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.
+	 * After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
+	 */
+	protected void assertExactlyOnceForTopic(
+			Properties properties,
+			String topic,
+			int partition,
+			List<Integer> expectedElements,
+			long timeoutMillis) throws Exception {
+
+		long startMillis = System.currentTimeMillis();
+		List<Integer> actualElements = new ArrayList<>();
+
+		Properties consumerProperties = new Properties();
+		consumerProperties.putAll(properties);
+		consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+		consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+		consumerProperties.put("isolation.level", "read_committed");
+
+		// until we timeout...
+		while (System.currentTimeMillis() < startMillis + timeoutMillis) {
+			// query kafka for new records ...
+			Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(consumerProperties, topic, partition, 1000);
+
+			for (ConsumerRecord<Integer, Integer> record : records) {
+				actualElements.add(record.value());
+			}
+
+			// succeed if we got all expectedElements
+			if (actualElements.equals(expectedElements)) {
+				return;
+			}
+			// fail early if we already have too many elements
+			if (actualElements.size() > expectedElements.size()) {
+				break;
+			}
+		}
+
+		fail(String.format("Expected number of elements: <%s>, but was: <%s>", expectedElements.size(), actualElements.size()));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
new file mode 100644
index 0000000..ef50766
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A Flink source that servers integers, but it completes only after a completed checkpoint after serving
+ * all of the elements.
+ */
+public class IntegerSource
+	extends RichParallelSourceFunction<Integer>
+	implements ListCheckpointed<Integer>, CheckpointListener {
+
+	/**
+	 * Blocker when the generator needs to wait for the checkpoint to happen.
+	 * Eager initialization means it must be serializable (pick any serializable type).
+	 */
+	private final Object blocker = new SerializableObject();
+
+	/**
+	 * The total number of events to generate.
+	 */
+	private final int numEventsTotal;
+
+	/**
+	 * The current position in the sequence of numbers.
+	 */
+	private int currentPosition = -1;
+
+	private long lastCheckpointTriggered;
+
+	private long lastCheckpointConfirmed;
+
+	private boolean restored;
+
+	private volatile boolean running = true;
+
+	public IntegerSource(int numEventsTotal) {
+		this.numEventsTotal = numEventsTotal;
+	}
+
+	@Override
+	public void run(SourceContext<Integer> ctx) throws Exception {
+
+		// each source subtask emits only the numbers where (num % parallelism == subtask_index)
+		final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
+		int current = this.currentPosition >= 0 ? this.currentPosition : getRuntimeContext().getIndexOfThisSubtask();
+
+		while (this.running && current < this.numEventsTotal) {
+			// emit the next element
+			synchronized (ctx.getCheckpointLock()) {
+				ctx.collect(current);
+				current += stepSize;
+				this.currentPosition = current;
+			}
+			// give some time to trigger checkpoint while we are not holding the lock (to prevent starvation)
+			if (!restored && current % 10 == 0) {
+				Thread.sleep(1);
+			}
+		}
+
+		// after we are done, we need to wait for two more checkpoint to complete
+		// before finishing the program - that is to be on the safe side that
+		// the sink also got the "commit" notification for all relevant checkpoints
+		// and committed the data
+		final long lastCheckpoint;
+		synchronized (ctx.getCheckpointLock()) {
+			lastCheckpoint = this.lastCheckpointTriggered;
+		}
+
+		synchronized (this.blocker) {
+			while (this.lastCheckpointConfirmed <= lastCheckpoint + 1) {
+				this.blocker.wait();
+			}
+		}
+	}
+
+	@Override
+	public void cancel() {
+		this.running = false;
+	}
+
+	@Override
+	public List<Integer> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		this.lastCheckpointTriggered = checkpointId;
+
+		return Collections.singletonList(this.currentPosition);
+	}
+
+	@Override
+	public void restoreState(List<Integer> state) throws Exception {
+		this.currentPosition = state.get(0);
+
+		// at least one checkpoint must have happened so far
+		this.lastCheckpointTriggered = 1L;
+		this.lastCheckpointConfirmed = 1L;
+		this.restored = true;
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		synchronized (blocker) {
+			this.lastCheckpointConfirmed = checkpointId;
+			blocker.notifyAll();
+		}
+	}
+}


[6/9] flink git commit: [FLINK-6988][kafka] Implement our own KafkaProducer class with transactions recovery

Posted by al...@apache.org.
[FLINK-6988][kafka] Implement our own KafkaProducer class with transactions recovery


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20728ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20728ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20728ba

Branch: refs/heads/master
Commit: d20728ba46977704827252ee5029bef9f949d5ab
Parents: 7a35c35
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Jul 12 15:14:13 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../kafka/internal/FlinkKafkaProducer.java      | 294 +++++++++++++++++++
 .../kafka/FlinkKafkaProducerTests.java          | 114 +++++++
 2 files changed, 408 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d20728ba/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
new file mode 100644
index 0000000..56b40d7
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Wrapper around KafkaProducer that allows to resume transactions in case of node failure, which allows to implement
+ * two phase commit algorithm for exactly-once semantic FlinkKafkaProducer.
+ *
+ * <p>For happy path usage is exactly the same as {@link org.apache.kafka.clients.producer.KafkaProducer}. User is
+ * expected to call:
+ *
+ * <ul>
+ *     <li>{@link FlinkKafkaProducer#initTransactions()}</li>
+ *     <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
+ *     <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
+ *     <li>{@link FlinkKafkaProducer#flush()}</li>
+ *     <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
+ * </ul>
+ *
+ * <p>To actually implement two phase commit, it must be possible to always commit a transaction after pre-committing
+ * it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In case of some failure between
+ * {@link FlinkKafkaProducer#flush()} and {@link FlinkKafkaProducer#commitTransaction()} this class allows to resume
+ * interrupted transaction and commit if after a restart:
+ *
+ * <ul>
+ *     <li>{@link FlinkKafkaProducer#initTransactions()}</li>
+ *     <li>{@link FlinkKafkaProducer#beginTransaction()}</li>
+ *     <li>{@link FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}</li>
+ *     <li>{@link FlinkKafkaProducer#flush()}</li>
+ *     <li>{@link FlinkKafkaProducer#getProducerId()}</li>
+ *     <li>{@link FlinkKafkaProducer#getEpoch()}</li>
+ *     <li>node failure... restore producerId and epoch from state</li>
+ *     <li>{@link FlinkKafkaProducer#resumeTransaction(long, short)}</li>
+ *     <li>{@link FlinkKafkaProducer#commitTransaction()}</li>
+ * </ul>
+ *
+ * <p>{@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces {@link FlinkKafkaProducer#initTransactions()}
+ * as a way to obtain the producerId and epoch counters. It has to be done, because otherwise
+ * {@link FlinkKafkaProducer#initTransactions()} would automatically abort all on going transactions.
+ *
+ * <p>Second way this implementation differs from the reference {@link org.apache.kafka.clients.producer.KafkaProducer}
+ * is that this one actually flushes new partitions on {@link FlinkKafkaProducer#flush()} instead of on
+ * {@link FlinkKafkaProducer#commitTransaction()}.
+ *
+ * <p>The last one minor difference is that it allows to obtain the producerId and epoch counters via
+ * {@link FlinkKafkaProducer#getProducerId()} and {@link FlinkKafkaProducer#getEpoch()} methods (which are unfortunately
+ * private fields).
+ *
+ * <p>Those changes are compatible with Kafka's 0.11.0 REST API although it clearly was not the intention of the Kafka's
+ * API authors to make them possible.
+ *
+ * <p>Internally this implementation uses {@link org.apache.kafka.clients.producer.KafkaProducer} and implements
+ * required changes via Java Reflection API. It might not be the prettiest solution. An alternative would be to
+ * re-implement whole Kafka's 0.11 REST API client on our own.
+ */
+public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
+
+	private final KafkaProducer<K, V> kafkaProducer;
+
+	@Nullable
+	private final String transactionalId;
+
+	public FlinkKafkaProducer(Properties properties) {
+		transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+		kafkaProducer = new KafkaProducer<>(properties);
+	}
+
+	// -------------------------------- Simple proxy method calls --------------------------------
+
+	@Override
+	public void initTransactions() {
+		kafkaProducer.initTransactions();
+	}
+
+	@Override
+	public void beginTransaction() throws ProducerFencedException {
+		kafkaProducer.beginTransaction();
+	}
+
+	@Override
+	public void commitTransaction() throws ProducerFencedException {
+		kafkaProducer.commitTransaction();
+	}
+
+	@Override
+	public void abortTransaction() throws ProducerFencedException {
+		kafkaProducer.abortTransaction();
+	}
+
+	@Override
+	public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
+		kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+	}
+
+	@Override
+	public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
+		return kafkaProducer.send(record);
+	}
+
+	@Override
+	public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
+		return kafkaProducer.send(record, callback);
+	}
+
+	@Override
+	public List<PartitionInfo> partitionsFor(String topic) {
+		return kafkaProducer.partitionsFor(topic);
+	}
+
+	@Override
+	public Map<MetricName, ? extends Metric> metrics() {
+		return kafkaProducer.metrics();
+	}
+
+	@Override
+	public void close() {
+		kafkaProducer.close();
+	}
+
+	@Override
+	public void close(long timeout, TimeUnit unit) {
+		kafkaProducer.close(timeout, unit);
+	}
+
+	// -------------------------------- New methods or methods with changed behaviour --------------------------------
+
+	@Override
+	public void flush() {
+		kafkaProducer.flush();
+		if (transactionalId != null) {
+			flushNewPartitions();
+		}
+	}
+
+	public void resumeTransaction(long producerId, short epoch) {
+		Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId {} and epoch {}", producerId, epoch);
+		LOG.info("Attempting to resume transaction with producerId {} and epoch {}", producerId, epoch);
+
+		Object transactionManager = getValue(kafkaProducer, "transactionManager");
+		Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
+
+		invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+		invoke(sequenceNumbers, "clear");
+
+		Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+		setValue(producerIdAndEpoch, "producerId", producerId);
+		setValue(producerIdAndEpoch, "epoch", epoch);
+
+		invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+		invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+		setValue(transactionManager, "transactionStarted", true);
+	}
+
+	public String getTransactionalId() {
+		return transactionalId;
+	}
+
+	public long getProducerId() {
+		Object transactionManager = getValue(kafkaProducer, "transactionManager");
+		Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+		return (long) getValue(producerIdAndEpoch, "producerId");
+	}
+
+	public short getEpoch() {
+		Object transactionManager = getValue(kafkaProducer, "transactionManager");
+		Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+		return (short) getValue(producerIdAndEpoch, "epoch");
+	}
+
+	@VisibleForTesting
+	public int getTransactionCoordinatorId() {
+		Object transactionManager = getValue(kafkaProducer, "transactionManager");
+		Node node = (Node) invoke(transactionManager, "coordinator", FindCoordinatorRequest.CoordinatorType.TRANSACTION);
+		return node.id();
+	}
+
+	private void flushNewPartitions() {
+		LOG.info("Flushing new partitions");
+		Object transactionManager = getValue(kafkaProducer, "transactionManager");
+		Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
+		invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
+		TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+		Object sender = getValue(kafkaProducer, "sender");
+		invoke(sender, "wakeup");
+		result.await();
+	}
+
+	private static Enum<?> getEnum(String enumFullName) {
+		String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
+		if (x.length == 2) {
+			String enumClassName = x[0];
+			String enumName = x[1];
+			try {
+				Class<Enum> cl = (Class<Enum>) Class.forName(enumClassName);
+				return Enum.valueOf(cl, enumName);
+			} catch (ClassNotFoundException e) {
+				throw new RuntimeException("Incompatible KafkaProducer version", e);
+			}
+		}
+		return null;
+	}
+
+	private static Object invoke(Object object, String methodName, Object... args) {
+		Class<?>[] argTypes = new Class[args.length];
+		for (int i = 0; i < args.length; i++) {
+			argTypes[i] = args[i].getClass();
+		}
+		return invoke(object, methodName, argTypes, args);
+	}
+
+	private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args) {
+		try {
+			Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
+			method.setAccessible(true);
+			return method.invoke(object, args);
+		} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
+			throw new RuntimeException("Incompatible KafkaProducer version", e);
+		}
+	}
+
+	private static Object getValue(Object object, String fieldName) {
+		return getValue(object, object.getClass(), fieldName);
+	}
+
+	private static Object getValue(Object object, Class<?> clazz, String fieldName) {
+		try {
+			Field field = clazz.getDeclaredField(fieldName);
+			field.setAccessible(true);
+			return field.get(object);
+		} catch (NoSuchFieldException | IllegalAccessException e) {
+			throw new RuntimeException("Incompatible KafkaProducer version", e);
+		}
+	}
+
+	private static void setValue(Object object, String fieldName, Object value) {
+		try {
+			Field field = object.getClass().getDeclaredField(fieldName);
+			field.setAccessible(true);
+			field.set(object, value);
+		} catch (NoSuchFieldException | IllegalAccessException e) {
+			throw new RuntimeException("Incompatible KafkaProducer version", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d20728ba/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java
new file mode 100644
index 0000000..18bbd8f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for our own {@link FlinkKafkaProducer}.
+ */
+@SuppressWarnings("serial")
+public class FlinkKafkaProducerTests extends KafkaTestBase {
+	protected String transactionalId;
+	protected Properties extraProperties;
+
+	@Before
+	public void before() {
+		transactionalId = UUID.randomUUID().toString();
+		extraProperties = new Properties();
+		extraProperties.putAll(standardProps);
+		extraProperties.put("transactional.id", transactionalId);
+		extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+		extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+		extraProperties.put("isolation.level", "read_committed");
+	}
+
+	@Test(timeout = 30000L)
+	public void testHappyPath() throws IOException {
+		String topicName = "flink-kafka-producer-happy-path";
+		try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
+			kafkaProducer.initTransactions();
+			kafkaProducer.beginTransaction();
+			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+			kafkaProducer.commitTransaction();
+		}
+		assertRecord(topicName, "42", "42");
+		deleteTestTopic(topicName);
+	}
+
+	@Test(timeout = 30000L)
+	public void testResumeTransaction() throws IOException {
+		String topicName = "flink-kafka-producer-resume-transaction";
+		try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
+			kafkaProducer.initTransactions();
+			kafkaProducer.beginTransaction();
+			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+			kafkaProducer.flush();
+			long producerId = kafkaProducer.getProducerId();
+			short epoch = kafkaProducer.getEpoch();
+
+			try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
+				resumeProducer.resumeTransaction(producerId, epoch);
+				resumeProducer.commitTransaction();
+			}
+
+			assertRecord(topicName, "42", "42");
+
+			// this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction
+			kafkaProducer.commitTransaction();
+
+			// this shouldn't fail also, for same reason as above
+			try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
+				resumeProducer.resumeTransaction(producerId, epoch);
+				resumeProducer.commitTransaction();
+			}
+		}
+		deleteTestTopic(topicName);
+	}
+
+	private void assertRecord(String topicName, String expectedKey, String expectedValue) {
+		try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
+			kafkaConsumer.subscribe(Collections.singletonList(topicName));
+			ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
+
+			ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
+			assertEquals(expectedKey, record.key());
+			assertEquals(expectedValue, record.value());
+		}
+	}
+}


[9/9] flink git commit: [FLINK-6988][kafka] Add flink-connector-kafka-0.11 with exactly-once semantic

Posted by al...@apache.org.
[FLINK-6988][kafka] Add flink-connector-kafka-0.11 with exactly-once semantic


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f651e9a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f651e9a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f651e9a

Branch: refs/heads/master
Commit: 2f651e9a69a9929ef154e7bf6fcba624b0e8b9a1
Parents: d20728b
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Fri Jun 23 09:14:28 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |   82 ++
 .../kafka/Kafka010ProducerITCase.java           |    9 +
 .../connectors/kafka/FlinkKafkaConsumer011.java |  113 ++
 .../connectors/kafka/FlinkKafkaProducer011.java | 1039 ++++++++++++++++++
 .../kafka/Kafka011AvroTableSource.java          |   58 +
 .../kafka/Kafka011JsonTableSource.java          |   53 +
 .../connectors/kafka/Kafka011TableSource.java   |   55 +
 .../metrics/KafkaMetricMuttableWrapper.java     |   43 +
 .../kafka/FlinkKafkaProducer011Tests.java       |  366 ++++++
 .../kafka/Kafka011AvroTableSourceTest.java      |   54 +
 .../connectors/kafka/Kafka011ITCase.java        |  353 ++++++
 .../kafka/Kafka011JsonTableSourceTest.java      |   49 +
 .../Kafka011ProducerAtLeastOnceITCase.java      |   44 +
 .../Kafka011ProducerExactlyOnceITCase.java      |   51 +
 .../kafka/KafkaTestEnvironmentImpl.java         |  497 +++++++++
 .../connectors/kafka/Kafka08ProducerITCase.java |    9 +
 .../connectors/kafka/Kafka09ProducerITCase.java |   10 +
 .../connectors/kafka/KafkaConsumerTestBase.java |    2 +-
 .../connectors/kafka/KafkaProducerTestBase.java |  100 +-
 .../connectors/kafka/KafkaTestBase.java         |   84 ++
 .../kafka/testutils/IntegerSource.java          |  130 +++
 21 files changed, 3170 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index f95c8c0..aabb1ba 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -72,6 +72,14 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is
         <td>0.10.x</td>
         <td>This connector supports <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka messages with timestamps</a> both for producing and consuming.</td>
     </tr>
+    <tr>
+        <td>flink-connector-kafka-0.11_2.11</td>
+        <td>1.4.0</td>
+        <td>FlinkKafkaConsumer011<br>
+        FlinkKafkaProducer011</td>
+        <td>0.11.x</td>
+        <td>Since 0.11.x Kafka does not support scala 2.10. This connector supports <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging">Kafka transactional messaging</a> to provide exactly once semantic for the producer.</td>
+    </tr>
   </tbody>
 </table>
 
@@ -518,6 +526,80 @@ into a Kafka topic.
   for more explanation.
 </div>
 
+#### Kafka 0.11
+
+With Flink's checkpointing enabled, the `FlinkKafkaProducer011` can provide
+exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different modes of operating
+chosen by passing appropriate `semantic` parameter to the `FlinkKafkaProducer011`:
+
+ * `Semantic.NONE`: Flink will not guarantee anything. Produced records can be lost or they can
+ be duplicated.
+ * `Semantic.AT_LEAST_ONCE` (default setting): similar to `setFlushOnCheckpoint(true)` in
+ `FlinkKafkaProducer010`. his guarantees that no records will be lost (although they can be duplicated).
+ * `Semantic.EXACTLY_ONCE`: uses Kafka transactions to provide exactly-once semantic.
+
+<div class="alert alert-warning">
+  <strong>Attention:</strong> Depending on your Kafka configuration, even after Kafka acknowledges
+  writes you can still experience data losses. In particular keep in mind about following properties
+  in Kafka config:
+  <ul>
+    <li><tt>acks</tt></li>
+    <li><tt>log.flush.interval.messages</tt></li>
+    <li><tt>log.flush.interval.ms</tt></li>
+    <li><tt>log.flush.*</tt></li>
+  </ul>
+  Default values for the above options can easily lead to data loss. Please refer to the Kafka documentation
+  for more explanation.
+</div>
+
+
+##### Caveats
+
+`Semantic.EXACTLY_ONCE` mode relies on the ability to commit transactions
+that were started before taking a checkpoint, after recovering from the said checkpoint. If the time
+between Flink application crash and completed restart is larger then Kafka's transaction timeout
+there will be data loss (Kafka will automatically abort transactions that exceeded timeout time).
+Having this in mind, please configure your transaction timeout appropriately to your expected down
+times.
+
+Kafka brokers by default have `transaction.max.timeout.ms` set to 15 minutes. This property will
+not allow to set transaction timeouts for the producers larger then it's value.
+`FlinkKafkaProducer011` by default sets the `transaction.timeout.ms` property in producer config to
+1 hour, thus `transaction.max.timeout.ms` should be increased before using the
+`Semantic.EXACTLY_ONCE` mode.
+
+In `read_committed` mode of `KafkaConsumer`, any transactions that were not finished
+(neither aborted nor completed) will block all reads from the given Kafka topic past any
+un-finished transaction. In other words after following sequence of events:
+
+1. User started `transaction1` and written some records using it
+2. User started `transaction2` and written some further records using it
+3. User committed `transaction2`
+
+Even if records from `transaction2` are already committed, they will not be visible to
+the consumers until `transaction1` is committed or aborted. This hastwo implications:
+
+ * First of all, during normal working of Flink applications, user can expect a delay in visibility
+ of the records produced into Kafka topics, equal to average time between completed checkpoints.
+ * Secondly in case of Flink application failure, topics into which this application was writting, 
+ will be blocked for the readers until the application restarts or the configured transaction 
+ timeout time will pass. This remark only applies for the cases when there are multiple
+ agents/applications writing to the same Kafka topic.
+
+**Note**:  `Semantic.EXACTLY_ONCE` mode uses a fixed size pool of KafkaProducers
+per each `FlinkKafkaProducer011` instance. One of each of those producers is used per one
+checkpoint. If the number of concurrent checkpoints exceeds the pool size, `FlinkKafkaProducer011`
+will throw an exception and will fail the whole application. Please configure max pool size and max
+number of concurrent checkpoints accordingly.
+
+**Note**: `Semantic.EXACTLY_ONCE` takes all possible measures to not leave any lingering transactions
+that would block the consumers from reading from Kafka topic more then it is necessary. However in the
+event of failure of Flink application before first checkpoint, after restarting such application there
+is no information in the system about previous pool sizes. Thus it is unsafe to scale down Flink
+application before first checkpoint completes, by factor larger then `FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR`.
+
 ## Using Kafka timestamps and Flink event time in Kafka 0.10
 
 Since Apache Kafka 0.10+, Kafka's messages can carry [timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message), indicating

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
index f811893..cf35a59 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ProducerITCase.java
@@ -23,4 +23,13 @@ package org.apache.flink.streaming.connectors.kafka;
  */
 @SuppressWarnings("serial")
 public class Kafka010ProducerITCase extends KafkaProducerTestBase {
+	@Override
+	public void testExactlyOnceRegularSink() throws Exception {
+		// Kafka010 does not support exactly once semantic
+	}
+
+	@Override
+	public void testExactlyOnceCustomOperator() throws Exception {
+		// Kafka010 does not support exactly once semantic
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
new file mode 100644
index 0000000..8d165c3
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
+ * Apache Kafka 0.11.x. The consumer can run in multiple parallel instances, each of which will pull
+ * data from one or more Kafka partitions.
+ *
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly once".
+ * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
+ *
+ * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer
+ * has consumed a topic.</p>
+ *
+ * <p>Please refer to Kafka's documentation for the available configuration properties:
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs</p>
+ */
+public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
+
+	private static final long serialVersionUID = 2324564345203409112L;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.11.x.
+	 *
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param valueDeserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	public FlinkKafkaConsumer011(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
+		this(Collections.singletonList(topic), valueDeserializer, props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.11.x
+	 *
+	 * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+	 * pairs, offsets, and topic names from Kafka.
+	 *
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	public FlinkKafkaConsumer011(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		this(Collections.singletonList(topic), deserializer, props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.11.x
+	 *
+	 * <p>This constructor allows passing multiple topics to the consumer.
+	 *
+	 * @param topics
+	 *           The Kafka topics to read from.
+	 * @param deserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties that are used to configure both the fetcher and the offset handler.
+	 */
+	public FlinkKafkaConsumer011(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
+		this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
+	}
+
+	/**
+	 * Creates a new Kafka streaming source consumer for Kafka 0.11.x
+	 *
+	 * <p>This constructor allows passing multiple topics and a key/value deserialization schema.
+	 *
+	 * @param topics
+	 *           The Kafka topics to read from.
+	 * @param deserializer
+	 *           The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties that are used to configure both the fetcher and the offset handler.
+	 */
+	public FlinkKafkaConsumer011(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		super(topics, deserializer, props);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
new file mode 100644
index 0000000..67e237d
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -0,0 +1,1039 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic#AT_LEAST_ONCE} semantic. Before using {@link Semantic#EXACTLY_ONCE} please refer to Flink's
+ * Kafka connector documentation.
+ */
+public class FlinkKafkaProducer011<IN>
+		extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext> {
+
+	/**
+	 *  Semantics that can be chosen.
+	 *  <li>{@link #EXACTLY_ONCE}</li>
+	 *  <li>{@link #AT_LEAST_ONCE}</li>
+	 *  <li>{@link #NONE}</li>
+	 */
+	public enum Semantic {
+
+		/**
+		 * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction that will be
+		 * committed to the Kafka on a checkpoint.
+		 *
+		 * <p>In this mode {@link FlinkKafkaProducer011} sets up a pool of {@link FlinkKafkaProducer}. Between each
+		 * checkpoint there is created new Kafka transaction, which is being committed on
+		 * {@link FlinkKafkaProducer011#notifyCheckpointComplete(long)}. If checkpoint complete notifications are
+		 * running late, {@link FlinkKafkaProducer011} can run out of {@link FlinkKafkaProducer}s in the pool. In that
+		 * case any subsequent {@link FlinkKafkaProducer011#snapshotState(FunctionSnapshotContext)} requests will fail
+		 * and {@link FlinkKafkaProducer011} will keep using the {@link FlinkKafkaProducer} from previous checkpoint.
+		 * To decrease chances of failing checkpoints there are three options:
+		 * <li>decrease number of max concurrent checkpoints</li>
+		 * <li>make checkpoints more reliable (so that they complete faster)</li>
+		 * <li>increase delay between checkpoints</li>
+		 * <li>increase size of {@link FlinkKafkaProducer}s pool</li>
+		 */
+		EXACTLY_ONCE,
+
+		/**
+		 * Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages in the Kafka buffers
+		 * to be acknowledged by the Kafka producer on a checkpoint.
+		 */
+		AT_LEAST_ONCE,
+
+		/**
+		 * Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or duplicated in case
+		 * of failure.
+		 */
+		NONE
+	}
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * This coefficient determines what is the safe scale down factor.
+	 *
+	 * <p>If the Flink application previously failed before first checkpoint completed or we are starting new batch
+	 * of {@link FlinkKafkaProducer011} from scratch without clean shutdown of the previous one,
+	 * {@link FlinkKafkaProducer011} doesn't know what was the set of previously used Kafka's transactionalId's. In
+	 * that case, it will try to play safe and abort all of the possible transactionalIds from the range of:
+	 * {@code [0, getNumberOfParallelSubtasks() * kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR) }
+	 *
+	 * <p>The range of available to use transactional ids is:
+	 * {@code [0, getNumberOfParallelSubtasks() * kafkaProducersPoolSize) }
+	 *
+	 * <p>This means that if we decrease {@code getNumberOfParallelSubtasks()} by a factor larger then
+	 * {@code SAFE_SCALE_DOWN_FACTOR} we can have a left some lingering transaction.
+	 */
+	public static final int SAFE_SCALE_DOWN_FACTOR = 5;
+
+	/**
+	 * Default number of KafkaProducers in the pool. See {@link Semantic#EXACTLY_ONCE}.
+	 */
+	public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
+
+	/**
+	 * Default value for kafka transaction timeout.
+	 */
+	public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours(1);
+
+	/**
+	 * Configuration key for disabling the metrics reporting.
+	 */
+	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
+
+	/**
+	 * Descriptor of the transacionalIds list.
+	 */
+	private static final ListStateDescriptor<NextTransactionalIdHint> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR =
+		new ListStateDescriptor<>("next-transactional-id-hint", TypeInformation.of(NextTransactionalIdHint.class));
+
+	/**
+	 * State for nextTransactionalIdHint.
+	 */
+	private transient ListState<NextTransactionalIdHint> nextTransactionalIdHintState;
+
+	/**
+	 * Hint for picking next transactional id.
+	 */
+	private NextTransactionalIdHint nextTransactionalIdHint;
+
+	/**
+	 * User defined properties for the Producer.
+	 */
+	private final Properties producerConfig;
+
+	/**
+	 * The name of the default topic this producer is writing data to.
+	 */
+	private final String defaultTopicId;
+
+	/**
+	 * (Serializable) SerializationSchema for turning objects used with Flink into.
+	 * byte[] for Kafka.
+	 */
+	private final KeyedSerializationSchema<IN> schema;
+
+	/**
+	 * User-provided partitioner for assigning an object to a Kafka partition for each topic.
+	 */
+	private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
+
+	/**
+	 * Partitions of each topic.
+	 */
+	private final Map<String, int[]> topicPartitionsMap;
+
+	/**
+	 * Max number of producers in the pool. If all producers are in use, snapshoting state will throw an exception.
+	 */
+	private final int kafkaProducersPoolSize;
+
+	/**
+	 * Available transactional ids.
+	 */
+	private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<>();
+
+	/**
+	 * Pool of KafkaProducers objects.
+	 */
+	private transient ProducersPool producersPool = new ProducersPool();
+
+	/**
+	 * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
+	 */
+	private boolean writeTimestampToKafka = false;
+
+	/**
+	 * Flag indicating whether to accept failures (and log them), or to fail on failures.
+	 */
+	private boolean logFailuresOnly;
+
+	/**
+	 * Semantic chosen for this instance.
+	 */
+	private Semantic semantic;
+
+	// -------------------------------- Runtime fields ------------------------------------------
+
+	/** The callback than handles error propagation or logging callbacks. */
+	@Nullable
+	private transient Callback callback;
+
+	/** Errors encountered in the async producer are stored here. */
+	@Nullable
+	private transient volatile Exception asyncException;
+
+	/** Lock for accessing the pending records. */
+	private final SerializableObject pendingRecordsLock = new SerializableObject();
+
+	/** Number of unacknowledged records. */
+	private final AtomicLong pendingRecords = new AtomicLong();
+
+	/** Cache of metrics to replace already registered metrics instead of overwriting existing ones. */
+	private final Map<String, KafkaMetricMuttableWrapper> previouslyCreatedMetrics = new HashMap<>();
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 *			Comma separated addresses of the brokers
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined (keyless) serialization schema.
+	 */
+	public FlinkKafkaProducer011(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
+		this(
+			topicId,
+			new KeyedSerializationSchemaWrapper<>(serializationSchema),
+			getPropertiesFromBrokerList(brokerList),
+			Optional.of(new FlinkFixedPartitioner<IN>()));
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined (keyless) serialization schema.
+	 * @param producerConfig
+	 * 			Properties with the producer configuration.
+	 */
+	public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
+		this(
+			topicId,
+			new KeyedSerializationSchemaWrapper<>(serializationSchema),
+			producerConfig,
+			Optional.of(new FlinkFixedPartitioner<IN>()));
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 */
+	public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+	}
+
+	// ------------------- Key/Value serialization schema constructors ----------------------
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 *			Comma separated addresses of the brokers
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema supporting key/value messages
+	 */
+	public FlinkKafkaProducer011(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
+		this(
+			topicId,
+			serializationSchema,
+			getPropertiesFromBrokerList(brokerList),
+			Optional.of(new FlinkFixedPartitioner<IN>()));
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema supporting key/value messages
+	 * @param producerConfig
+	 * 			Properties with the producer configuration.
+	 */
+	public FlinkKafkaProducer011(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
+		this(
+			topicId,
+			serializationSchema,
+			producerConfig,
+			Optional.of(new FlinkFixedPartitioner<IN>()));
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema supporting key/value messages
+	 * @param producerConfig
+	 * 			Properties with the producer configuration.
+	 */
+	public FlinkKafkaProducer011(
+			String topicId,
+			KeyedSerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			Semantic semantic) {
+		this(topicId,
+			serializationSchema,
+			producerConfig,
+			Optional.of(new FlinkFixedPartitioner<IN>()),
+			semantic,
+			DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+	}
+
+
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param defaultTopicId The default topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+	 */
+	public FlinkKafkaProducer011(
+			String defaultTopicId,
+			KeyedSerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+		this(
+			defaultTopicId,
+			serializationSchema,
+			producerConfig,
+			customPartitioner,
+			Semantic.AT_LEAST_ONCE,
+			DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+	}
+
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param defaultTopicId The default topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+	 * @param semantic Defines semantic that will be used by this producer (see {@link Semantic}).
+	 * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool size (see {@link Semantic#EXACTLY_ONCE}).
+	 */
+	public FlinkKafkaProducer011(
+			String defaultTopicId,
+			KeyedSerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
+			Semantic semantic,
+			int kafkaProducersPoolSize) {
+		super(TypeInformation.of(new TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {}));
+
+		this.defaultTopicId = checkNotNull(defaultTopicId, "defaultTopicId is null");
+		this.schema = checkNotNull(serializationSchema, "serializationSchema is null");
+		this.producerConfig = checkNotNull(producerConfig, "producerConfig is null");
+		this.flinkKafkaPartitioner = checkNotNull(customPartitioner, "customPartitioner is null").orElse(null);
+		this.semantic = checkNotNull(semantic, "semantic is null");
+		this.kafkaProducersPoolSize = kafkaProducersPoolSize;
+		checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty");
+
+		ClosureCleaner.clean(this.flinkKafkaPartitioner, true);
+		ClosureCleaner.ensureSerializable(serializationSchema);
+
+		// set the producer configuration properties for kafka record key value serializers.
+		if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+			this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		} else {
+			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+		}
+
+		if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+			this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
+		} else {
+			LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+		}
+
+		// eagerly ensure that bootstrap servers are set.
+		if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+			throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
+		}
+
+		if (!producerConfig.contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
+			long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
+			checkState(timeout < Integer.MAX_VALUE && timeout > 0, "timeout does not fit into 32 bit integer");
+			this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout);
+			LOG.warn("Property [%s] not specified. Setting it to %s", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
+		}
+
+		this.topicPartitionsMap = new HashMap<>();
+	}
+
+	// ---------------------------------- Properties --------------------------
+
+	/**
+	 * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
+	 * Timestamps must be positive for Kafka to accept them.
+	 *
+	 * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
+	 */
+	public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+		this.writeTimestampToKafka = writeTimestampToKafka;
+	}
+
+	/**
+	 * Defines whether the producer should fail on errors, or only log them.
+	 * If this is set to true, then exceptions will be only logged, if set to false,
+	 * exceptions will be eventually thrown and cause the streaming program to
+	 * fail (and enter recovery).
+	 *
+	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+	 */
+	public void setLogFailuresOnly(boolean logFailuresOnly) {
+		this.logFailuresOnly = logFailuresOnly;
+	}
+
+	// ----------------------------------- Utilities --------------------------
+
+	/**
+	 * Initializes the connection to Kafka.
+	 */
+	@Override
+	public void open(Configuration configuration) throws Exception {
+		if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
+			LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", semantic, Semantic.NONE);
+			semantic = Semantic.NONE;
+		}
+
+		if (logFailuresOnly) {
+			callback = new Callback() {
+				@Override
+				public void onCompletion(RecordMetadata metadata, Exception e) {
+					if (e != null) {
+						LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
+					}
+					acknowledgeMessage();
+				}
+			};
+		}
+		else {
+			callback = new Callback() {
+				@Override
+				public void onCompletion(RecordMetadata metadata, Exception exception) {
+					if (exception != null && asyncException == null) {
+						asyncException = exception;
+					}
+					acknowledgeMessage();
+				}
+			};
+		}
+
+		super.open(configuration);
+	}
+
+	@Override
+	public void invoke(KafkaTransactionState transaction, IN next, Context context) throws Exception {
+		checkErroneous();
+
+		byte[] serializedKey = schema.serializeKey(next);
+		byte[] serializedValue = schema.serializeValue(next);
+		String targetTopic = schema.getTargetTopic(next);
+		if (targetTopic == null) {
+			targetTopic = defaultTopicId;
+		}
+
+		Long timestamp = null;
+		if (this.writeTimestampToKafka) {
+			timestamp = context.timestamp();
+		}
+
+		ProducerRecord<byte[], byte[]> record;
+		int[] partitions = topicPartitionsMap.get(targetTopic);
+		if (null == partitions) {
+			partitions = getPartitionsByTopic(targetTopic, transaction.producer);
+			topicPartitionsMap.put(targetTopic, partitions);
+		}
+		if (flinkKafkaPartitioner != null) {
+			record = new ProducerRecord<>(
+				targetTopic,
+				flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
+				timestamp,
+				serializedKey,
+				serializedValue);
+		} else {
+			record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
+		}
+		pendingRecords.incrementAndGet();
+		transaction.producer.send(record, callback);
+	}
+
+	@Override
+	public void close() throws Exception {
+		if (currentTransaction != null) {
+			// to avoid exceptions on aborting transactions with some pending records
+			flush(currentTransaction);
+		}
+		try {
+			super.close();
+		}
+		catch (Exception e) {
+			asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
+		}
+		try {
+			producersPool.close();
+		}
+		catch (Exception e) {
+			asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
+		}
+		// make sure we propagate pending errors
+		checkErroneous();
+	}
+
+	// ------------------- Logic for handling checkpoint flushing -------------------------- //
+
+	@Override
+	protected KafkaTransactionState beginTransaction() throws Exception {
+		switch (semantic) {
+			case EXACTLY_ONCE:
+				FlinkKafkaProducer<byte[], byte[]> producer = createOrGetProducerFromPool();
+				producer.beginTransaction();
+				return new KafkaTransactionState(producer.getTransactionalId(), producer);
+			case AT_LEAST_ONCE:
+			case NONE:
+				// Do not create new producer on each beginTransaction() if it is not necessary
+				if (currentTransaction != null && currentTransaction.producer != null) {
+					return new KafkaTransactionState(currentTransaction.producer);
+				}
+				return new KafkaTransactionState(initProducer(true));
+			default:
+				throw new UnsupportedOperationException("Not implemented semantic");
+		}
+	}
+
+	private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws Exception {
+		FlinkKafkaProducer<byte[], byte[]> producer = producersPool.poll();
+		if (producer == null) {
+			String transactionalId = availableTransactionalIds.poll();
+			if (transactionalId == null) {
+				throw new Exception(
+					"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checktpoins.");
+			}
+			producer = initTransactionalProducer(transactionalId, true);
+			producer.initTransactions();
+		}
+		return producer;
+	}
+
+	@Override
+	protected void preCommit(KafkaTransactionState transaction) throws Exception {
+		switch (semantic) {
+			case EXACTLY_ONCE:
+			case AT_LEAST_ONCE:
+				flush(transaction);
+				break;
+			case NONE:
+				break;
+			default:
+				throw new UnsupportedOperationException("Not implemented semantic");
+		}
+		checkErroneous();
+	}
+
+	@Override
+	protected void commit(KafkaTransactionState transaction) {
+		switch (semantic) {
+			case EXACTLY_ONCE:
+				transaction.producer.commitTransaction();
+				producersPool.add(transaction.producer);
+				break;
+			case AT_LEAST_ONCE:
+			case NONE:
+				break;
+			default:
+				throw new UnsupportedOperationException("Not implemented semantic");
+		}
+	}
+
+	@Override
+	protected void recoverAndCommit(KafkaTransactionState transaction) {
+		switch (semantic) {
+			case EXACTLY_ONCE:
+				KafkaTransactionState kafkaTransaction = transaction;
+				FlinkKafkaProducer<byte[], byte[]> producer =
+					initTransactionalProducer(kafkaTransaction.transactionalId, false);
+				producer.resumeTransaction(kafkaTransaction.producerId, kafkaTransaction.epoch);
+				try {
+					producer.commitTransaction();
+					producer.close();
+				}
+				catch (InvalidTxnStateException ex) {
+					// That means we have committed this transaction before.
+					LOG.warn("Encountered error {} while recovering transaction {}. " +
+						"Presumably this transaction has been already committed before",
+						ex,
+						transaction);
+				}
+				break;
+			case AT_LEAST_ONCE:
+			case NONE:
+				break;
+			default:
+				throw new UnsupportedOperationException("Not implemented semantic");
+		}
+	}
+
+	@Override
+	protected void abort(KafkaTransactionState transaction) {
+		switch (semantic) {
+			case EXACTLY_ONCE:
+				transaction.producer.abortTransaction();
+				producersPool.add(transaction.producer);
+				break;
+			case AT_LEAST_ONCE:
+			case NONE:
+				producersPool.add(transaction.producer);
+				break;
+			default:
+				throw new UnsupportedOperationException("Not implemented semantic");
+		}
+	}
+
+	@Override
+	protected void recoverAndAbort(KafkaTransactionState transaction) {
+		switch (semantic) {
+			case EXACTLY_ONCE:
+				FlinkKafkaProducer<byte[], byte[]> producer =
+					initTransactionalProducer(transaction.transactionalId, false);
+				producer.resumeTransaction(transaction.producerId, transaction.epoch);
+				producer.abortTransaction();
+				producer.close();
+				break;
+			case AT_LEAST_ONCE:
+			case NONE:
+				break;
+			default:
+				throw new UnsupportedOperationException("Not implemented semantic");
+		}
+	}
+
+	private void acknowledgeMessage() {
+		pendingRecords.decrementAndGet();
+	}
+
+	/**
+	 * Flush pending records.
+	 * @param transaction
+	 */
+	private void flush(KafkaTransactionState transaction) throws Exception {
+		if (transaction.producer != null) {
+			transaction.producer.flush();
+		}
+		long pendingRecordsCount = pendingRecords.get();
+		if (pendingRecordsCount != 0) {
+			throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecordsCount);
+		}
+
+		// if the flushed requests has errors, we should propagate it also and fail the checkpoint
+		checkErroneous();
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		super.snapshotState(context);
+
+		nextTransactionalIdHintState.clear();
+		// To avoid duplication only first subtask keeps track of next transactional id hint. Otherwise all of the
+		// subtasks would write exactly same information.
+		if (getRuntimeContext().getIndexOfThisSubtask() == 0 && nextTransactionalIdHint != null) {
+			long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;
+
+			// If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
+			// case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this
+			// scaling up.
+			if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
+				nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
+			}
+
+			nextTransactionalIdHintState.add(new NextTransactionalIdHint(
+				getRuntimeContext().getNumberOfParallelSubtasks(),
+				nextFreeTransactionalId));
+		}
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
+			NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
+
+		if (semantic != Semantic.EXACTLY_ONCE) {
+			nextTransactionalIdHint = null;
+		} else {
+			ArrayList<NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get());
+			if (transactionalIdHints.size() > 1) {
+				throw new IllegalStateException(
+					"There should be at most one next transactional id hint written by the first subtask");
+			} else if (transactionalIdHints.size() == 0) {
+				nextTransactionalIdHint = new NextTransactionalIdHint(0, 0);
+
+				// this means that this is either:
+				// (1) the first execution of this application
+				// (2) previous execution has failed before first checkpoint completed
+				//
+				// in case of (2) we have to abort all previous transactions, but we don't know was the parallelism used
+				// then, so we must guess using current configured pool size, current parallelism and
+				// SAFE_SCALE_DOWN_FACTOR
+				long abortTransactionalIdStart = getRuntimeContext().getIndexOfThisSubtask();
+				long abortTransactionalIdEnd = abortTransactionalIdStart + 1;
+
+				abortTransactionalIdStart *= kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR;
+				abortTransactionalIdEnd *= kafkaProducersPoolSize * SAFE_SCALE_DOWN_FACTOR;
+				abortTransactions(LongStream.range(abortTransactionalIdStart, abortTransactionalIdEnd));
+			} else {
+				nextTransactionalIdHint = transactionalIdHints.get(0);
+			}
+		}
+
+		super.initializeState(context);
+	}
+
+	@Override
+	protected Optional<KafkaTransactionContext> initializeUserContext() {
+		if (semantic != Semantic.EXACTLY_ONCE) {
+			return Optional.empty();
+		}
+
+		Set<String> transactionalIds = generateNewTransactionalIds();
+		resetAvailableTransactionalIdsPool(transactionalIds);
+		return Optional.of(new KafkaTransactionContext(transactionalIds));
+	}
+
+	private Set<String> generateNewTransactionalIds() {
+		Preconditions.checkState(nextTransactionalIdHint != null,
+			"nextTransactionalIdHint must be present for EXACTLY_ONCE");
+
+		// range of available transactional ids is:
+		// [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * kafkaProducersPoolSize)
+		// loop below picks in a deterministic way a subrange of those available transactional ids based on index of
+		// this subtask
+		int subtaskId = getRuntimeContext().getIndexOfThisSubtask();
+		Set<String> transactionalIds = new HashSet<>();
+		for (int i = 0; i < kafkaProducersPoolSize; i++) {
+			long transactionalId = nextTransactionalIdHint.nextFreeTransactionalId + subtaskId * kafkaProducersPoolSize + i;
+			transactionalIds.add(generateTransactionalId(transactionalId));
+		}
+		LOG.info("Generated new transactionalIds {}", transactionalIds);
+		return transactionalIds;
+	}
+
+	@Override
+	protected void finishRecoveringContext() {
+		cleanUpUserContext();
+		resetAvailableTransactionalIdsPool(getUserContext().get().transactionalIds);
+		LOG.info("Recovered transactionalIds {}", getUserContext().get().transactionalIds);
+	}
+
+	/**
+	 * After initialization make sure that all previous transactions from the current user context have been completed.
+	 */
+	private void cleanUpUserContext() {
+		if (!getUserContext().isPresent()) {
+			return;
+		}
+		abortTransactions(getUserContext().get().transactionalIds.stream());
+	}
+
+	private void resetAvailableTransactionalIdsPool(Collection<String> transactionalIds) {
+		availableTransactionalIds.clear();
+		for (String transactionalId : transactionalIds) {
+			availableTransactionalIds.add(transactionalId);
+		}
+	}
+
+	// ----------------------------------- Utilities --------------------------
+
+	private void abortTransactions(LongStream transactionalIds) {
+		abortTransactions(transactionalIds.mapToObj(this::generateTransactionalId));
+	}
+
+	private void abortTransactions(Stream<String> transactionalIds) {
+		transactionalIds.forEach(transactionalId -> {
+			try (FlinkKafkaProducer<byte[], byte[]> kafkaProducer =
+					initTransactionalProducer(transactionalId, false)) {
+				kafkaProducer.initTransactions();
+			}
+		});
+	}
+
+	private String generateTransactionalId(long transactionalId) {
+		String transactionalIdFormat = getRuntimeContext().getTaskName() + "-%d";
+		return String.format(transactionalIdFormat, transactionalId);
+	}
+
+	int getTransactionCoordinatorId() {
+		if (currentTransaction == null || currentTransaction.producer == null) {
+			throw new IllegalArgumentException();
+		}
+		return currentTransaction.producer.getTransactionCoordinatorId();
+	}
+
+	private FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) {
+		producerConfig.put("transactional.id", transactionalId);
+		return initProducer(registerMetrics);
+	}
+
+	private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean registerMetrics) {
+		FlinkKafkaProducer<byte[], byte[]> producer = new FlinkKafkaProducer<>(this.producerConfig);
+
+		RuntimeContext ctx = getRuntimeContext();
+
+		if (flinkKafkaPartitioner != null) {
+			if (flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
+				((FlinkKafkaDelegatePartitioner) flinkKafkaPartitioner).setPartitions(
+					getPartitionsByTopic(this.defaultTopicId, producer));
+			}
+			flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+		}
+
+		LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}",
+			ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId);
+
+		// register Kafka metrics to Flink accumulators
+		if (registerMetrics && !Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
+			Map<MetricName, ? extends Metric> metrics = producer.metrics();
+
+			if (metrics == null) {
+				// MapR's Kafka implementation returns null here.
+				LOG.info("Producer implementation does not support metrics");
+			} else {
+				final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
+				for (Map.Entry<MetricName, ? extends Metric> entry: metrics.entrySet()) {
+					String name = entry.getKey().name();
+					Metric metric = entry.getValue();
+
+					KafkaMetricMuttableWrapper wrapper = previouslyCreatedMetrics.get(name);
+					if (wrapper != null) {
+						wrapper.setKafkaMetric(metric);
+					} else {
+						// TODO: somehow merge metrics from all active producers?
+						wrapper = new KafkaMetricMuttableWrapper(metric);
+						previouslyCreatedMetrics.put(name, wrapper);
+						kafkaMetricGroup.gauge(name, wrapper);
+					}
+				}
+			}
+		}
+		return producer;
+	}
+
+	private void checkErroneous() throws Exception {
+		Exception e = asyncException;
+		if (e != null) {
+			// prevent double throwing
+			asyncException = null;
+			throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
+		}
+	}
+
+	private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		producersPool = new ProducersPool();
+	}
+
+	private static Properties getPropertiesFromBrokerList(String brokerList) {
+		String[] elements = brokerList.split(",");
+
+		// validate the broker addresses
+		for (String broker: elements) {
+			NetUtils.getCorrectHostnamePort(broker);
+		}
+
+		Properties props = new Properties();
+		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+		return props;
+	}
+
+	private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
+		// the fetched list is immutable, so we're creating a mutable copy in order to sort it
+		List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
+
+		// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
+		Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
+			@Override
+			public int compare(PartitionInfo o1, PartitionInfo o2) {
+				return Integer.compare(o1.partition(), o2.partition());
+			}
+		});
+
+		int[] partitions = new int[partitionsList.size()];
+		for (int i = 0; i < partitions.length; i++) {
+			partitions[i] = partitionsList.get(i).partition();
+		}
+
+		return partitions;
+	}
+
+	/**
+	 * State for handling transactions.
+	 */
+	public static class KafkaTransactionState {
+
+		private final transient FlinkKafkaProducer<byte[], byte[]> producer;
+
+		@Nullable
+		public final String transactionalId;
+
+		public final long producerId;
+
+		public final short epoch;
+
+		public KafkaTransactionState(String transactionalId, FlinkKafkaProducer<byte[], byte[]> producer) {
+			this.producer = producer;
+			this.transactionalId = transactionalId;
+			this.producerId = producer.getProducerId();
+			this.epoch = producer.getEpoch();
+		}
+
+		public KafkaTransactionState(FlinkKafkaProducer<byte[], byte[]> producer) {
+			this.producer = producer;
+			this.transactionalId = null;
+			this.producerId = -1;
+			this.epoch = -1;
+		}
+
+		@Override
+		public String toString() {
+			return String.format("%s [transactionalId=%s]", this.getClass().getSimpleName(), transactionalId);
+		}
+	}
+
+	/**
+	 * Context associated to this instance of the {@link FlinkKafkaProducer011}. User for keeping track of the
+	 * transactionalIds.
+	 */
+	public static class KafkaTransactionContext {
+		public final Set<String> transactionalIds;
+
+		public KafkaTransactionContext(Set<String> transactionalIds) {
+			this.transactionalIds = transactionalIds;
+		}
+	}
+
+	static class ProducersPool implements Closeable {
+		private final LinkedBlockingDeque<FlinkKafkaProducer<byte[], byte[]>> pool = new LinkedBlockingDeque<>();
+
+		public void add(FlinkKafkaProducer<byte[], byte[]> producer) {
+			pool.add(producer);
+		}
+
+		public FlinkKafkaProducer<byte[], byte[]> poll() {
+			return pool.poll();
+		}
+
+		@Override
+		public void close() {
+			while (!pool.isEmpty()) {
+				pool.poll().close();
+			}
+		}
+	}
+
+	/**
+	 * Keep information required to deduce next safe to use transactional id.
+	 */
+	public static class NextTransactionalIdHint {
+		public int lastParallelism = 0;
+		public long nextFreeTransactionalId = 0;
+
+		public NextTransactionalIdHint() {
+			this(0, 0);
+		}
+
+		public NextTransactionalIdHint(int parallelism, long nextFreeTransactionalId) {
+			this.lastParallelism = parallelism;
+			this.nextFreeTransactionalId = nextFreeTransactionalId;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
new file mode 100644
index 0000000..edc37cb
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.11.
+ */
+public class Kafka011AvroTableSource extends KafkaAvroTableSource {
+
+	/**
+	 * Creates a Kafka 0.11 Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param record     Avro specific record.
+	 */
+	public Kafka011AvroTableSource(
+		String topic,
+		Properties properties,
+		Class<? extends SpecificRecordBase> record) {
+
+		super(
+			topic,
+			properties,
+			record);
+	}
+
+	@Override
+	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
new file mode 100644
index 0000000..471c2d2
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.11.
+ */
+public class Kafka011JsonTableSource extends KafkaJsonTableSource {
+
+	/**
+	 * Creates a Kafka 0.11 JSON {@link StreamTableSource}.
+	 *
+	 * @param topic      Kafka topic to consume.
+	 * @param properties Properties for the Kafka consumer.
+	 * @param typeInfo   Type information describing the result type. The field names are used
+	 *                   to parse the JSON file and so are the types.
+	 */
+	public Kafka011JsonTableSource(
+			String topic,
+			Properties properties,
+			TypeInformation<Row> typeInfo) {
+
+		super(topic, properties, typeInfo);
+	}
+
+	@Override
+	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
new file mode 100644
index 0000000..5eaea97
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka {@link StreamTableSource} for Kafka 0.11.
+ */
+public class Kafka011TableSource extends Kafka09TableSource {
+
+	/**
+	 * Creates a Kafka 0.11 {@link StreamTableSource}.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @param typeInfo              Type information describing the result type. The field names are used
+	 *                              to parse the JSON file and so are the types.
+	 */
+	public Kafka011TableSource(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema,
+			TypeInformation<Row> typeInfo) {
+
+		super(topic, properties, deserializationSchema, typeInfo);
+	}
+
+	@Override
+	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
+		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/metrics/KafkaMetricMuttableWrapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/metrics/KafkaMetricMuttableWrapper.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/metrics/KafkaMetricMuttableWrapper.java
new file mode 100644
index 0000000..a22ff5c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/metrics/KafkaMetricMuttableWrapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+import org.apache.kafka.common.Metric;
+
+/**
+ * Gauge for getting the current value of a Kafka metric.
+ */
+public class KafkaMetricMuttableWrapper implements Gauge<Double> {
+	private org.apache.kafka.common.Metric kafkaMetric;
+
+	public KafkaMetricMuttableWrapper(org.apache.kafka.common.Metric metric) {
+		this.kafkaMetric = metric;
+	}
+
+	@Override
+	public Double getValue() {
+		return kafkaMetric.value();
+	}
+
+	public void setKafkaMetric(Metric kafkaMetric) {
+		this.kafkaMetric = kafkaMetric;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
new file mode 100644
index 0000000..51410da
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import kafka.server.KafkaServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer011}.
+ */
+@SuppressWarnings("serial")
+public class FlinkKafkaProducer011Tests extends KafkaTestBase {
+	protected String transactionalId;
+	protected Properties extraProperties;
+
+	protected TypeInformationSerializationSchema<Integer> integerSerializationSchema =
+			new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+	protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema =
+			new KeyedSerializationSchemaWrapper(integerSerializationSchema);
+
+	@Before
+	public void before() {
+		transactionalId = UUID.randomUUID().toString();
+		extraProperties = new Properties();
+		extraProperties.putAll(standardProps);
+		extraProperties.put("transactional.id", transactionalId);
+		extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+		extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+		extraProperties.put("isolation.level", "read_committed");
+	}
+
+	@Test(timeout = 30000L)
+	public void testHappyPath() throws IOException {
+		String topicName = "flink-kafka-producer-happy-path";
+		try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
+			kafkaProducer.initTransactions();
+			kafkaProducer.beginTransaction();
+			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+			kafkaProducer.commitTransaction();
+		}
+		assertRecord(topicName, "42", "42");
+		deleteTestTopic(topicName);
+	}
+
+	@Test(timeout = 30000L)
+	public void testResumeTransaction() throws IOException {
+		String topicName = "flink-kafka-producer-resume-transaction";
+		try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
+			kafkaProducer.initTransactions();
+			kafkaProducer.beginTransaction();
+			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+			kafkaProducer.flush();
+			long producerId = kafkaProducer.getProducerId();
+			short epoch = kafkaProducer.getEpoch();
+
+			try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
+				resumeProducer.resumeTransaction(producerId, epoch);
+				resumeProducer.commitTransaction();
+			}
+
+			assertRecord(topicName, "42", "42");
+
+			// this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction
+			kafkaProducer.commitTransaction();
+
+			// this shouldn't fail also, for same reason as above
+			try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
+				resumeProducer.resumeTransaction(producerId, epoch);
+				resumeProducer.commitTransaction();
+			}
+		}
+		deleteTestTopic(topicName);
+	}
+
+	@Test(timeout = 120_000L)
+	public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception {
+		String topic = "flink-kafka-producer-fail-before-notify";
+
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
+
+		testHarness.setup();
+		testHarness.open();
+		testHarness.initializeState(null);
+		testHarness.processElement(42, 0);
+		testHarness.snapshot(0, 1);
+		testHarness.processElement(43, 2);
+		OperatorStateHandles snapshot = testHarness.snapshot(1, 3);
+
+		int leaderId = kafkaServer.getLeaderToShutDown(topic);
+		failBroker(leaderId);
+
+		try {
+			testHarness.processElement(44, 4);
+			testHarness.snapshot(2, 5);
+			assertFalse(true);
+		}
+		catch (Exception ex) {
+			// expected
+		}
+		try {
+			testHarness.close();
+		}
+		catch (Exception ex) {
+		}
+
+		kafkaServer.restartBroker(leaderId);
+
+		testHarness = createTestHarness(topic);
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.close();
+
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L);
+
+		deleteTestTopic(topic);
+	}
+
+	@Test(timeout = 120_000L)
+	public void testFlinkKafkaProducer011FailTransactionCoordinatorBeforeNotify() throws Exception {
+		String topic = "flink-kafka-producer-fail-transaction-coordinator-before-notify";
+
+		Properties properties = createProperties();
+
+		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+			topic,
+			integerKeyedSerializationSchema,
+			properties,
+			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			IntSerializer.INSTANCE);
+
+		testHarness1.setup();
+		testHarness1.open();
+		testHarness1.initializeState(null);
+		testHarness1.processElement(42, 0);
+		testHarness1.snapshot(0, 1);
+		testHarness1.processElement(43, 2);
+		int transactionCoordinatorId = kafkaProducer.getTransactionCoordinatorId();
+		OperatorStateHandles snapshot = testHarness1.snapshot(1, 3);
+
+		failBroker(transactionCoordinatorId);
+
+		try {
+			testHarness1.processElement(44, 4);
+			testHarness1.notifyOfCompletedCheckpoint(1);
+			testHarness1.close();
+		}
+		catch (Exception ex) {
+			// Expected... some random exception could be thrown by any of the above operations.
+		}
+		finally {
+			kafkaServer.restartBroker(transactionCoordinatorId);
+		}
+
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 = createTestHarness(topic)) {
+			testHarness2.setup();
+			testHarness2.initializeState(snapshot);
+			testHarness2.open();
+		}
+
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L);
+
+		deleteTestTopic(topic);
+	}
+
+	/**
+	 * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure.
+	 * If such transactions were left alone lingering it consumers would be unable to read committed records
+	 * that were created after this lingering transaction.
+	 */
+	@Test(timeout = 120_000L)
+	public void testFailBeforeNotifyAndResumeWorkAfterwards() throws Exception {
+		String topic = "flink-kafka-producer-fail-before-notify";
+
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
+
+		testHarness.setup();
+		testHarness.open();
+		testHarness.processElement(42, 0);
+		testHarness.snapshot(0, 1);
+		testHarness.processElement(43, 2);
+		OperatorStateHandles snapshot1 = testHarness.snapshot(1, 3);
+
+		testHarness.processElement(44, 4);
+		testHarness.snapshot(2, 5);
+		testHarness.processElement(45, 6);
+
+		// do not close previous testHarness to make sure that closing do not clean up something (in case of failure
+		// there might not be any close)
+		testHarness = createTestHarness(topic);
+		testHarness.setup();
+		// restore from snapshot1, transactions with records 44 and 45 should be aborted
+		testHarness.initializeState(snapshot1);
+		testHarness.open();
+
+		// write and commit more records, after potentially lingering transactions
+		testHarness.processElement(46, 7);
+		testHarness.snapshot(4, 8);
+		testHarness.processElement(47, 9);
+		testHarness.notifyOfCompletedCheckpoint(4);
+
+		//now we should have:
+		// - records 42 and 43 in committed transactions
+		// - aborted transactions with records 44 and 45
+		// - committed transaction with record 46
+		// - pending transaction with record 47
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 46), 30_000L);
+
+		testHarness.close();
+		deleteTestTopic(topic);
+	}
+
+	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
+		Properties properties = createProperties();
+
+		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+			topic,
+			integerKeyedSerializationSchema,
+			properties,
+			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+
+		return new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			IntSerializer.INSTANCE);
+	}
+
+	private Properties createProperties() {
+		Properties properties = new Properties();
+		properties.putAll(standardProps);
+		properties.putAll(secureProps);
+		properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
+		return properties;
+	}
+
+	@Test
+	public void testRecoverCommittedTransaction() throws Exception {
+		String topic = "flink-kafka-producer-recover-committed-transaction";
+
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
+
+		testHarness.setup();
+		testHarness.open(); // producerA - start transaction (txn) 0
+		testHarness.processElement(42, 0); // producerA - write 42 in txn 0
+		OperatorStateHandles checkpoint0 = testHarness.snapshot(0, 1); // producerA - pre commit txn 0, producerB - start txn 1
+		testHarness.processElement(43, 2); // producerB - write 43 in txn 1
+		testHarness.notifyOfCompletedCheckpoint(0); // producerA - commit txn 0 and return to the pool
+		testHarness.snapshot(1, 3); // producerB - pre txn 1,  producerA - start txn 2
+		testHarness.processElement(44, 4); // producerA - write 44 in txn 2
+		testHarness.close(); // producerA - abort txn 2
+
+		testHarness = createTestHarness(topic);
+		testHarness.initializeState(checkpoint0); // recover state 0 - producerA recover and commit txn 0
+		testHarness.close();
+
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42), 30_000L);
+
+		deleteTestTopic(topic);
+	}
+
+	@Test
+	public void testRunOutOfProducersInThePool() throws Exception {
+		String topic = "flink-kafka-run-out-of-producers";
+
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			for (int i = 0; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE * 2; i++) {
+				testHarness.processElement(i, i * 2);
+				testHarness.snapshot(i, i * 2 + 1);
+			}
+		}
+		catch (Exception ex) {
+			if (!ex.getCause().getMessage().startsWith("Too many ongoing")) {
+				throw ex;
+			}
+		}
+		deleteTestTopic(topic);
+	}
+
+	// shut down a Kafka broker
+	private void failBroker(int brokerId) {
+		KafkaServer toShutDown = null;
+		for (KafkaServer server : kafkaServer.getBrokers()) {
+
+			if (kafkaServer.getBrokerId(server) == brokerId) {
+				toShutDown = server;
+				break;
+			}
+		}
+
+		if (toShutDown == null) {
+			StringBuilder listOfBrokers = new StringBuilder();
+			for (KafkaServer server : kafkaServer.getBrokers()) {
+				listOfBrokers.append(kafkaServer.getBrokerId(server));
+				listOfBrokers.append(" ; ");
+			}
+
+			throw new IllegalArgumentException("Cannot find broker to shut down: " + brokerId
+				+ " ; available brokers: " + listOfBrokers.toString());
+		} else {
+			toShutDown.shutdown();
+			toShutDown.awaitShutdown();
+		}
+	}
+
+	private void assertRecord(String topicName, String expectedKey, String expectedValue) {
+		try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
+			kafkaConsumer.subscribe(Collections.singletonList(topicName));
+			ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
+
+			ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
+			assertEquals(expectedKey, record.key());
+			assertEquals(expectedValue, record.value());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
new file mode 100644
index 0000000..e60bf17
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka011AvroTableSource}.
+ */
+public class Kafka011AvroTableSourceTest extends KafkaTableSourceTestBase {
+
+	@Override
+	protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
+
+		return new Kafka011AvroTableSource(
+			topic,
+			properties,
+			AvroSpecificRecord.class);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+		return (Class) AvroRowDeserializationSchema.class;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+		return (Class) FlinkKafkaConsumer011.class;
+	}
+}
+


[3/9] flink git commit: [FLINK-6988] Make TwoPhaseCommitSinkFunction work with Context

Posted by al...@apache.org.
[FLINK-6988] Make TwoPhaseCommitSinkFunction work with Context


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49cef0c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49cef0c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49cef0c0

Branch: refs/heads/master
Commit: 49cef0c0c24c15b668381ca590b87a62a14f75b5
Parents: 9a3621b
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Sep 25 16:16:34 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../functions/sink/TwoPhaseCommitSinkFunction.java    | 14 +++++++++++---
 .../sink/TwoPhaseCommitSinkFunctionTest.java          |  2 +-
 2 files changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49cef0c0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 6040979..2dfa292 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -107,7 +107,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 	/**
 	 * Write value within a transaction.
 	 */
-	protected abstract void invoke(TXN transaction, IN value) throws Exception;
+	protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception;
 
 	/**
 	 * Method that starts a new transaction.
@@ -159,9 +159,17 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 
 	// ------ entry points for above methods implementing {@CheckPointedFunction} and {@CheckpointListener} ------
 
+
+	/**
+	 * This should not be implemented by subclasses.
+	 */
+	@Override
+	public final void invoke(IN value) throws Exception {}
+
 	@Override
-	public final void invoke(IN value) throws Exception {
-		invoke(currentTransaction, value);
+	public final void invoke(
+		IN value, Context context) throws Exception {
+		invoke(currentTransaction, value, context);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/49cef0c0/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 4715c39..3043512 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -136,7 +136,7 @@ public class TwoPhaseCommitSinkFunctionTest {
 		}
 
 		@Override
-		protected void invoke(FileTransaction transaction, String value) throws Exception {
+		protected void invoke(FileTransaction transaction, String value, Context context) throws Exception {
 			transaction.writer.write(value);
 		}