You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/11/02 10:00:05 UTC

[07/11] flink git commit: [hotfix][kafka-tests] Fix test names so that they are not ignored by mvn build

[hotfix][kafka-tests] Fix test names so that they are not ignored by mvn build


Branch: refs/heads/master
Commit: 856b6baf1672ac0a9eaedc56cb18562e934ebac3
Parents: 872c35e
Author: Piotr Nowojski <>
Authored: Fri Oct 27 15:11:24 2017 +0200
Committer: Tzu-Li (Gordon) Tai <>
Committed: Thu Nov 2 12:43:20 2017 +0800

 .../kafka/        | 519 +++++++++++++++++++
 .../kafka/       | 519 -------------------
 .../kafka/           | 114 ++++
 .../kafka/          | 114 ----
 4 files changed, 633 insertions(+), 633 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/
new file mode 100644
index 0000000..1b87ff7
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/
@@ -0,0 +1,519 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+import kafka.server.KafkaServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+ * IT cases for the {@link FlinkKafkaProducer011}.
+ */
+public class FlinkKafkaProducer011Test extends KafkaTestBase {
+	protected String transactionalId;
+	protected Properties extraProperties;
+	protected TypeInformationSerializationSchema<Integer> integerSerializationSchema =
+			new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+	protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema =
+			new KeyedSerializationSchemaWrapper<>(integerSerializationSchema);
+	@Before
+	public void before() {
+		transactionalId = UUID.randomUUID().toString();
+		extraProperties = new Properties();
+		extraProperties.putAll(standardProps);
+		extraProperties.put("", transactionalId);
+		extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+		extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+		extraProperties.put("isolation.level", "read_committed");
+	}
+	@Test(timeout = 120_000L)
+	public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception {
+		String topic = "flink-kafka-producer-fail-before-notify";
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
+		testHarness.setup();
+		testHarness.initializeState(null);
+		testHarness.processElement(42, 0);
+		testHarness.snapshot(0, 1);
+		testHarness.processElement(43, 2);
+		OperatorStateHandles snapshot = testHarness.snapshot(1, 3);
+		int leaderId = kafkaServer.getLeaderToShutDown(topic);
+		failBroker(leaderId);
+		try {
+			testHarness.processElement(44, 4);
+			testHarness.snapshot(2, 5);
+			assertFalse(true);
+		}
+		catch (Exception ex) {
+			// expected
+		}
+		try {
+			testHarness.close();
+		}
+		catch (Exception ex) {
+		}
+		kafkaServer.restartBroker(leaderId);
+		testHarness = createTestHarness(topic);
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.close();
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L);
+		deleteTestTopic(topic);
+	}
+	@Test(timeout = 120_000L)
+	public void testFlinkKafkaProducer011FailTransactionCoordinatorBeforeNotify() throws Exception {
+		String topic = "flink-kafka-producer-fail-transaction-coordinator-before-notify";
+		Properties properties = createProperties();
+		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+			topic,
+			integerKeyedSerializationSchema,
+			properties,
+			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			IntSerializer.INSTANCE);
+		testHarness1.setup();
+		testHarness1.initializeState(null);
+		testHarness1.processElement(42, 0);
+		testHarness1.snapshot(0, 1);
+		testHarness1.processElement(43, 2);
+		int transactionCoordinatorId = kafkaProducer.getTransactionCoordinatorId();
+		OperatorStateHandles snapshot = testHarness1.snapshot(1, 3);
+		failBroker(transactionCoordinatorId);
+		try {
+			testHarness1.processElement(44, 4);
+			testHarness1.notifyOfCompletedCheckpoint(1);
+			testHarness1.close();
+		}
+		catch (Exception ex) {
+			// Expected... some random exception could be thrown by any of the above operations.
+		}
+		finally {
+			kafkaServer.restartBroker(transactionCoordinatorId);
+		}
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 = createTestHarness(topic)) {
+			testHarness2.setup();
+			testHarness2.initializeState(snapshot);
+		}
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L);
+		deleteTestTopic(topic);
+	}
+	/**
+	 * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure.
+	 * If such transactions were left alone lingering it consumers would be unable to read committed records
+	 * that were created after this lingering transaction.
+	 */
+	@Test(timeout = 120_000L)
+	public void testFailBeforeNotifyAndResumeWorkAfterwards() throws Exception {
+		String topic = "flink-kafka-producer-fail-before-notify";
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
+		testHarness.setup();
+		testHarness.processElement(42, 0);
+		testHarness.snapshot(0, 1);
+		testHarness.processElement(43, 2);
+		OperatorStateHandles snapshot1 = testHarness.snapshot(1, 3);
+		testHarness.processElement(44, 4);
+		testHarness.snapshot(2, 5);
+		testHarness.processElement(45, 6);
+		// do not close previous testHarness to make sure that closing do not clean up something (in case of failure
+		// there might not be any close)
+		testHarness = createTestHarness(topic);
+		testHarness.setup();
+		// restore from snapshot1, transactions with records 44 and 45 should be aborted
+		testHarness.initializeState(snapshot1);
+		// write and commit more records, after potentially lingering transactions
+		testHarness.processElement(46, 7);
+		testHarness.snapshot(4, 8);
+		testHarness.processElement(47, 9);
+		testHarness.notifyOfCompletedCheckpoint(4);
+		//now we should have:
+		// - records 42 and 43 in committed transactions
+		// - aborted transactions with records 44 and 45
+		// - committed transaction with record 46
+		// - pending transaction with record 47
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 46), 30_000L);
+		testHarness.close();
+		deleteTestTopic(topic);
+	}
+	/**
+	 * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure,
+	 * which happened before first checkpoint and was followed up by reducing the parallelism.
+	 * If such transactions were left alone lingering it consumers would be unable to read committed records
+	 * that were created after this lingering transaction.
+	 */
+	@Test(timeout = 120_000L)
+	public void testScaleDownBeforeFirstCheckpoint() throws Exception {
+		String topic = "scale-down-before-first-checkpoint";
+		List<AutoCloseable> operatorsToClose = new ArrayList<>();
+		int preScaleDownParallelism = Math.max(2, FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR);
+		for (int subtaskIndex = 0; subtaskIndex < preScaleDownParallelism; subtaskIndex++) {
+			OneInputStreamOperatorTestHarness<Integer, Object> preScaleDownOperator = createTestHarness(
+				topic,
+				preScaleDownParallelism,
+				preScaleDownParallelism,
+				subtaskIndex);
+			preScaleDownOperator.setup();
+			preScaleDownOperator.processElement(subtaskIndex * 2, 0);
+			preScaleDownOperator.snapshot(0, 1);
+			preScaleDownOperator.processElement(subtaskIndex * 2 + 1, 2);
+			operatorsToClose.add(preScaleDownOperator);
+		}
+		// do not close previous testHarnesses to make sure that closing do not clean up something (in case of failure
+		// there might not be any close)
+		// After previous failure simulate restarting application with smaller parallelism
+		OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0);
+		postScaleDownOperator1.setup();
+		// write and commit more records, after potentially lingering transactions
+		postScaleDownOperator1.processElement(46, 7);
+		postScaleDownOperator1.snapshot(4, 8);
+		postScaleDownOperator1.processElement(47, 9);
+		postScaleDownOperator1.notifyOfCompletedCheckpoint(4);
+		//now we should have:
+		// - records 42, 43, 44 and 45 in aborted transactions
+		// - committed transaction with record 46
+		// - pending transaction with record 47
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(46), 30_000L);
+		postScaleDownOperator1.close();
+		// ignore ProducerFencedExceptions, because postScaleDownOperator1 could reuse transactional ids.
+		for (AutoCloseable operatorToClose : operatorsToClose) {
+			closeIgnoringProducerFenced(operatorToClose);
+		}
+		deleteTestTopic(topic);
+	}
+	/**
+	 * Each instance of FlinkKafkaProducer011 uses it's own pool of transactional ids. After the restore from checkpoint
+	 * transactional ids are redistributed across the subtasks. In case of scale down, the surplus transactional ids
+	 * are dropped. In case of scale up, new one are generated (for the new subtasks). This test make sure that sequence
+	 * of scaling down and up again works fine. Especially it checks whether the newly generated ids in scaling up
+	 * do not overlap with ids that were used before scaling down. For example we start with 4 ids and parallelism 4:
+	 * [1], [2], [3], [4] - one assigned per each subtask
+	 * we scale down to parallelism 2:
+	 * [1, 2], [3, 4] - first subtask got id 1 and 2, second got ids 3 and 4
+	 * surplus ids are dropped from the pools and we scale up to parallelism 3:
+	 * [1 or 2], [3 or 4], [???]
+	 * new subtask have to generate new id(s), but he can not use ids that are potentially in use, so it has to generate
+	 * new ones that are greater then 4.
+	 */
+	@Test(timeout = 120_000L)
+	public void testScaleUpAfterScalingDown() throws Exception {
+		String topic = "scale-down-before-first-checkpoint";
+		final int parallelism1 = 4;
+		final int parallelism2 = 2;
+		final int parallelism3 = 3;
+		final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3));
+		List<OperatorStateHandle> operatorStateHandles = repartitionAndExecute(
+			topic,
+			Collections.emptyList(),
+			parallelism1,
+			maxParallelism,
+			IntStream.range(0, parallelism1).boxed().iterator());
+		operatorStateHandles = repartitionAndExecute(
+			topic,
+			operatorStateHandles,
+			parallelism2,
+			maxParallelism,
+			IntStream.range(parallelism1,  parallelism1 + parallelism2).boxed().iterator());
+		operatorStateHandles = repartitionAndExecute(
+			topic,
+			operatorStateHandles,
+			parallelism3,
+			maxParallelism,
+			IntStream.range(parallelism1 + parallelism2,  parallelism1 + parallelism2 + parallelism3).boxed().iterator());
+		// After each previous repartitionAndExecute call, we are left with some lingering transactions, that would
+		// not allow us to read all committed messages from the topic. Thus we initialize operators from
+		// operatorStateHandles once more, but without any new data. This should terminate all ongoing transactions.
+		operatorStateHandles = repartitionAndExecute(
+			topic,
+			operatorStateHandles,
+			1,
+			maxParallelism,
+			Collections.emptyIterator());
+		assertExactlyOnceForTopic(
+			createProperties(),
+			topic,
+			0,
+			IntStream.range(0, parallelism1 + parallelism2 + parallelism3).boxed().collect(Collectors.toList()),
+			30_000L);
+		deleteTestTopic(topic);
+	}
+	private List<OperatorStateHandle> repartitionAndExecute(
+			String topic,
+			List<OperatorStateHandle> inputStates,
+			int parallelism,
+			int maxParallelism,
+			Iterator<Integer> inputData) throws Exception {
+		List<OperatorStateHandle> outputStates = new ArrayList<>();
+		List<OneInputStreamOperatorTestHarness<Integer, Object>> testHarnesses = new ArrayList<>();
+		for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) {
+			OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
+				createTestHarness(topic, maxParallelism, parallelism, subtaskIndex);
+			testHarnesses.add(testHarness);
+			testHarness.setup();
+			testHarness.initializeState(new OperatorStateHandles(
+				0,
+				Collections.emptyList(),
+				Collections.emptyList(),
+				inputStates,
+				Collections.emptyList()));
+			if (inputData.hasNext()) {
+				int nextValue =;
+				testHarness.processElement(nextValue, 0);
+				OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+				outputStates.addAll(snapshot.getManagedOperatorState());
+				checkState(snapshot.getRawOperatorState() == null, "Unexpected raw operator state");
+				checkState(snapshot.getManagedKeyedState() == null, "Unexpected managed keyed state");
+				checkState(snapshot.getRawKeyedState() == null, "Unexpected raw keyed state");
+				for (int i = 1; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) {
+					testHarness.processElement(-nextValue, 0);
+					testHarness.snapshot(i, 0);
+				}
+			}
+		}
+		for (OneInputStreamOperatorTestHarness<Integer, Object> testHarness : testHarnesses) {
+			testHarness.close();
+		}
+		return outputStates;
+	}
+	@Test
+	public void testRecoverCommittedTransaction() throws Exception {
+		String topic = "flink-kafka-producer-recover-committed-transaction";
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
+		testHarness.setup();
+; // producerA - start transaction (txn) 0
+		testHarness.processElement(42, 0); // producerA - write 42 in txn 0
+		OperatorStateHandles checkpoint0 = testHarness.snapshot(0, 1); // producerA - pre commit txn 0, producerB - start txn 1
+		testHarness.processElement(43, 2); // producerB - write 43 in txn 1
+		testHarness.notifyOfCompletedCheckpoint(0); // producerA - commit txn 0 and return to the pool
+		testHarness.snapshot(1, 3); // producerB - pre txn 1,  producerA - start txn 2
+		testHarness.processElement(44, 4); // producerA - write 44 in txn 2
+		testHarness.close(); // producerA - abort txn 2
+		testHarness = createTestHarness(topic);
+		testHarness.initializeState(checkpoint0); // recover state 0 - producerA recover and commit txn 0
+		testHarness.close();
+		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42), 30_000L);
+		deleteTestTopic(topic);
+	}
+	@Test
+	public void testRunOutOfProducersInThePool() throws Exception {
+		String topic = "flink-kafka-run-out-of-producers";
+		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) {
+			testHarness.setup();
+			for (int i = 0; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE * 2; i++) {
+				testHarness.processElement(i, i * 2);
+				testHarness.snapshot(i, i * 2 + 1);
+			}
+		}
+		catch (Exception ex) {
+			if (!ex.getCause().getMessage().startsWith("Too many ongoing")) {
+				throw ex;
+			}
+		}
+		deleteTestTopic(topic);
+	}
+	// shut down a Kafka broker
+	private void failBroker(int brokerId) {
+		KafkaServer toShutDown = null;
+		for (KafkaServer server : kafkaServer.getBrokers()) {
+			if (kafkaServer.getBrokerId(server) == brokerId) {
+				toShutDown = server;
+				break;
+			}
+		}
+		if (toShutDown == null) {
+			StringBuilder listOfBrokers = new StringBuilder();
+			for (KafkaServer server : kafkaServer.getBrokers()) {
+				listOfBrokers.append(kafkaServer.getBrokerId(server));
+				listOfBrokers.append(" ; ");
+			}
+			throw new IllegalArgumentException("Cannot find broker to shut down: " + brokerId
+				+ " ; available brokers: " + listOfBrokers.toString());
+		} else {
+			toShutDown.shutdown();
+			toShutDown.awaitShutdown();
+		}
+	}
+	private void assertRecord(String topicName, String expectedKey, String expectedValue) {
+		try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
+			kafkaConsumer.subscribe(Collections.singletonList(topicName));
+			ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
+			ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
+			assertEquals(expectedKey, record.key());
+			assertEquals(expectedValue, record.value());
+		}
+	}
+	private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
+		try {
+			autoCloseable.close();
+		}
+		catch (Exception ex) {
+			if (!(ex.getCause() instanceof ProducerFencedException)) {
+				throw ex;
+			}
+		}
+	}
+	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
+		return createTestHarness(topic, 1, 1, 0);
+	}
+	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(
+		String topic,
+		int maxParallelism,
+		int parallelism,
+		int subtaskIndex) throws Exception {
+		Properties properties = createProperties();
+		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+			topic,
+			integerKeyedSerializationSchema,
+			properties,
+			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+		return new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			maxParallelism,
+			parallelism,
+			subtaskIndex,
+			IntSerializer.INSTANCE);
+	}
+	private Properties createProperties() {
+		Properties properties = new Properties();
+		properties.putAll(standardProps);
+		properties.putAll(secureProps);
+		properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
+		return properties;
+	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/
deleted file mode 100644
index 381ba33..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/
+++ /dev/null
@@ -1,519 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import kafka.server.KafkaServer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.errors.ProducerFencedException;
-import org.junit.Before;
-import org.junit.Test;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
- * IT cases for the {@link FlinkKafkaProducer011}.
- */
-public class FlinkKafkaProducer011Tests extends KafkaTestBase {
-	protected String transactionalId;
-	protected Properties extraProperties;
-	protected TypeInformationSerializationSchema<Integer> integerSerializationSchema =
-			new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-	protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema =
-			new KeyedSerializationSchemaWrapper<>(integerSerializationSchema);
-	@Before
-	public void before() {
-		transactionalId = UUID.randomUUID().toString();
-		extraProperties = new Properties();
-		extraProperties.putAll(standardProps);
-		extraProperties.put("", transactionalId);
-		extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-		extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-		extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-		extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-		extraProperties.put("isolation.level", "read_committed");
-	}
-	@Test(timeout = 120_000L)
-	public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception {
-		String topic = "flink-kafka-producer-fail-before-notify";
-		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
-		testHarness.setup();
-		testHarness.initializeState(null);
-		testHarness.processElement(42, 0);
-		testHarness.snapshot(0, 1);
-		testHarness.processElement(43, 2);
-		OperatorStateHandles snapshot = testHarness.snapshot(1, 3);
-		int leaderId = kafkaServer.getLeaderToShutDown(topic);
-		failBroker(leaderId);
-		try {
-			testHarness.processElement(44, 4);
-			testHarness.snapshot(2, 5);
-			assertFalse(true);
-		}
-		catch (Exception ex) {
-			// expected
-		}
-		try {
-			testHarness.close();
-		}
-		catch (Exception ex) {
-		}
-		kafkaServer.restartBroker(leaderId);
-		testHarness = createTestHarness(topic);
-		testHarness.setup();
-		testHarness.initializeState(snapshot);
-		testHarness.close();
-		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L);
-		deleteTestTopic(topic);
-	}
-	@Test(timeout = 120_000L)
-	public void testFlinkKafkaProducer011FailTransactionCoordinatorBeforeNotify() throws Exception {
-		String topic = "flink-kafka-producer-fail-transaction-coordinator-before-notify";
-		Properties properties = createProperties();
-		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
-			topic,
-			integerKeyedSerializationSchema,
-			properties,
-			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
-		OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
-			new StreamSink<>(kafkaProducer),
-			IntSerializer.INSTANCE);
-		testHarness1.setup();
-		testHarness1.initializeState(null);
-		testHarness1.processElement(42, 0);
-		testHarness1.snapshot(0, 1);
-		testHarness1.processElement(43, 2);
-		int transactionCoordinatorId = kafkaProducer.getTransactionCoordinatorId();
-		OperatorStateHandles snapshot = testHarness1.snapshot(1, 3);
-		failBroker(transactionCoordinatorId);
-		try {
-			testHarness1.processElement(44, 4);
-			testHarness1.notifyOfCompletedCheckpoint(1);
-			testHarness1.close();
-		}
-		catch (Exception ex) {
-			// Expected... some random exception could be thrown by any of the above operations.
-		}
-		finally {
-			kafkaServer.restartBroker(transactionCoordinatorId);
-		}
-		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 = createTestHarness(topic)) {
-			testHarness2.setup();
-			testHarness2.initializeState(snapshot);
-		}
-		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L);
-		deleteTestTopic(topic);
-	}
-	/**
-	 * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure.
-	 * If such transactions were left alone lingering it consumers would be unable to read committed records
-	 * that were created after this lingering transaction.
-	 */
-	@Test(timeout = 120_000L)
-	public void testFailBeforeNotifyAndResumeWorkAfterwards() throws Exception {
-		String topic = "flink-kafka-producer-fail-before-notify";
-		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
-		testHarness.setup();
-		testHarness.processElement(42, 0);
-		testHarness.snapshot(0, 1);
-		testHarness.processElement(43, 2);
-		OperatorStateHandles snapshot1 = testHarness.snapshot(1, 3);
-		testHarness.processElement(44, 4);
-		testHarness.snapshot(2, 5);
-		testHarness.processElement(45, 6);
-		// do not close previous testHarness to make sure that closing do not clean up something (in case of failure
-		// there might not be any close)
-		testHarness = createTestHarness(topic);
-		testHarness.setup();
-		// restore from snapshot1, transactions with records 44 and 45 should be aborted
-		testHarness.initializeState(snapshot1);
-		// write and commit more records, after potentially lingering transactions
-		testHarness.processElement(46, 7);
-		testHarness.snapshot(4, 8);
-		testHarness.processElement(47, 9);
-		testHarness.notifyOfCompletedCheckpoint(4);
-		//now we should have:
-		// - records 42 and 43 in committed transactions
-		// - aborted transactions with records 44 and 45
-		// - committed transaction with record 46
-		// - pending transaction with record 47
-		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 46), 30_000L);
-		testHarness.close();
-		deleteTestTopic(topic);
-	}
-	/**
-	 * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure,
-	 * which happened before first checkpoint and was followed up by reducing the parallelism.
-	 * If such transactions were left alone lingering it consumers would be unable to read committed records
-	 * that were created after this lingering transaction.
-	 */
-	@Test(timeout = 120_000L)
-	public void testScaleDownBeforeFirstCheckpoint() throws Exception {
-		String topic = "scale-down-before-first-checkpoint";
-		List<AutoCloseable> operatorsToClose = new ArrayList<>();
-		int preScaleDownParallelism = Math.max(2, FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR);
-		for (int subtaskIndex = 0; subtaskIndex < preScaleDownParallelism; subtaskIndex++) {
-			OneInputStreamOperatorTestHarness<Integer, Object> preScaleDownOperator = createTestHarness(
-				topic,
-				preScaleDownParallelism,
-				preScaleDownParallelism,
-				subtaskIndex);
-			preScaleDownOperator.setup();
-			preScaleDownOperator.processElement(subtaskIndex * 2, 0);
-			preScaleDownOperator.snapshot(0, 1);
-			preScaleDownOperator.processElement(subtaskIndex * 2 + 1, 2);
-			operatorsToClose.add(preScaleDownOperator);
-		}
-		// do not close previous testHarnesses to make sure that closing do not clean up something (in case of failure
-		// there might not be any close)
-		// After previous failure simulate restarting application with smaller parallelism
-		OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0);
-		postScaleDownOperator1.setup();
-		// write and commit more records, after potentially lingering transactions
-		postScaleDownOperator1.processElement(46, 7);
-		postScaleDownOperator1.snapshot(4, 8);
-		postScaleDownOperator1.processElement(47, 9);
-		postScaleDownOperator1.notifyOfCompletedCheckpoint(4);
-		//now we should have:
-		// - records 42, 43, 44 and 45 in aborted transactions
-		// - committed transaction with record 46
-		// - pending transaction with record 47
-		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(46), 30_000L);
-		postScaleDownOperator1.close();
-		// ignore ProducerFencedExceptions, because postScaleDownOperator1 could reuse transactional ids.
-		for (AutoCloseable operatorToClose : operatorsToClose) {
-			closeIgnoringProducerFenced(operatorToClose);
-		}
-		deleteTestTopic(topic);
-	}
-	/**
-	 * Each instance of FlinkKafkaProducer011 uses it's own pool of transactional ids. After the restore from checkpoint
-	 * transactional ids are redistributed across the subtasks. In case of scale down, the surplus transactional ids
-	 * are dropped. In case of scale up, new one are generated (for the new subtasks). This test make sure that sequence
-	 * of scaling down and up again works fine. Especially it checks whether the newly generated ids in scaling up
-	 * do not overlap with ids that were used before scaling down. For example we start with 4 ids and parallelism 4:
-	 * [1], [2], [3], [4] - one assigned per each subtask
-	 * we scale down to parallelism 2:
-	 * [1, 2], [3, 4] - first subtask got id 1 and 2, second got ids 3 and 4
-	 * surplus ids are dropped from the pools and we scale up to parallelism 3:
-	 * [1 or 2], [3 or 4], [???]
-	 * new subtask have to generate new id(s), but he can not use ids that are potentially in use, so it has to generate
-	 * new ones that are greater then 4.
-	 */
-	@Test(timeout = 120_000L)
-	public void testScaleUpAfterScalingDown() throws Exception {
-		String topic = "scale-down-before-first-checkpoint";
-		final int parallelism1 = 4;
-		final int parallelism2 = 2;
-		final int parallelism3 = 3;
-		final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3));
-		List<OperatorStateHandle> operatorStateHandles = repartitionAndExecute(
-			topic,
-			Collections.emptyList(),
-			parallelism1,
-			maxParallelism,
-			IntStream.range(0, parallelism1).boxed().iterator());
-		operatorStateHandles = repartitionAndExecute(
-			topic,
-			operatorStateHandles,
-			parallelism2,
-			maxParallelism,
-			IntStream.range(parallelism1,  parallelism1 + parallelism2).boxed().iterator());
-		operatorStateHandles = repartitionAndExecute(
-			topic,
-			operatorStateHandles,
-			parallelism3,
-			maxParallelism,
-			IntStream.range(parallelism1 + parallelism2,  parallelism1 + parallelism2 + parallelism3).boxed().iterator());
-		// After each previous repartitionAndExecute call, we are left with some lingering transactions, that would
-		// not allow us to read all committed messages from the topic. Thus we initialize operators from
-		// operatorStateHandles once more, but without any new data. This should terminate all ongoing transactions.
-		operatorStateHandles = repartitionAndExecute(
-			topic,
-			operatorStateHandles,
-			1,
-			maxParallelism,
-			Collections.emptyIterator());
-		assertExactlyOnceForTopic(
-			createProperties(),
-			topic,
-			0,
-			IntStream.range(0, parallelism1 + parallelism2 + parallelism3).boxed().collect(Collectors.toList()),
-			30_000L);
-		deleteTestTopic(topic);
-	}
-	private List<OperatorStateHandle> repartitionAndExecute(
-			String topic,
-			List<OperatorStateHandle> inputStates,
-			int parallelism,
-			int maxParallelism,
-			Iterator<Integer> inputData) throws Exception {
-		List<OperatorStateHandle> outputStates = new ArrayList<>();
-		List<OneInputStreamOperatorTestHarness<Integer, Object>> testHarnesses = new ArrayList<>();
-		for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) {
-			OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
-				createTestHarness(topic, maxParallelism, parallelism, subtaskIndex);
-			testHarnesses.add(testHarness);
-			testHarness.setup();
-			testHarness.initializeState(new OperatorStateHandles(
-				0,
-				Collections.emptyList(),
-				Collections.emptyList(),
-				inputStates,
-				Collections.emptyList()));
-			if (inputData.hasNext()) {
-				int nextValue =;
-				testHarness.processElement(nextValue, 0);
-				OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-				outputStates.addAll(snapshot.getManagedOperatorState());
-				checkState(snapshot.getRawOperatorState() == null, "Unexpected raw operator state");
-				checkState(snapshot.getManagedKeyedState() == null, "Unexpected managed keyed state");
-				checkState(snapshot.getRawKeyedState() == null, "Unexpected raw keyed state");
-				for (int i = 1; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) {
-					testHarness.processElement(-nextValue, 0);
-					testHarness.snapshot(i, 0);
-				}
-			}
-		}
-		for (OneInputStreamOperatorTestHarness<Integer, Object> testHarness : testHarnesses) {
-			testHarness.close();
-		}
-		return outputStates;
-	}
-	@Test
-	public void testRecoverCommittedTransaction() throws Exception {
-		String topic = "flink-kafka-producer-recover-committed-transaction";
-		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
-		testHarness.setup();
-; // producerA - start transaction (txn) 0
-		testHarness.processElement(42, 0); // producerA - write 42 in txn 0
-		OperatorStateHandles checkpoint0 = testHarness.snapshot(0, 1); // producerA - pre commit txn 0, producerB - start txn 1
-		testHarness.processElement(43, 2); // producerB - write 43 in txn 1
-		testHarness.notifyOfCompletedCheckpoint(0); // producerA - commit txn 0 and return to the pool
-		testHarness.snapshot(1, 3); // producerB - pre txn 1,  producerA - start txn 2
-		testHarness.processElement(44, 4); // producerA - write 44 in txn 2
-		testHarness.close(); // producerA - abort txn 2
-		testHarness = createTestHarness(topic);
-		testHarness.initializeState(checkpoint0); // recover state 0 - producerA recover and commit txn 0
-		testHarness.close();
-		assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42), 30_000L);
-		deleteTestTopic(topic);
-	}
-	@Test
-	public void testRunOutOfProducersInThePool() throws Exception {
-		String topic = "flink-kafka-run-out-of-producers";
-		try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) {
-			testHarness.setup();
-			for (int i = 0; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE * 2; i++) {
-				testHarness.processElement(i, i * 2);
-				testHarness.snapshot(i, i * 2 + 1);
-			}
-		}
-		catch (Exception ex) {
-			if (!ex.getCause().getMessage().startsWith("Too many ongoing")) {
-				throw ex;
-			}
-		}
-		deleteTestTopic(topic);
-	}
-	// shut down a Kafka broker
-	private void failBroker(int brokerId) {
-		KafkaServer toShutDown = null;
-		for (KafkaServer server : kafkaServer.getBrokers()) {
-			if (kafkaServer.getBrokerId(server) == brokerId) {
-				toShutDown = server;
-				break;
-			}
-		}
-		if (toShutDown == null) {
-			StringBuilder listOfBrokers = new StringBuilder();
-			for (KafkaServer server : kafkaServer.getBrokers()) {
-				listOfBrokers.append(kafkaServer.getBrokerId(server));
-				listOfBrokers.append(" ; ");
-			}
-			throw new IllegalArgumentException("Cannot find broker to shut down: " + brokerId
-				+ " ; available brokers: " + listOfBrokers.toString());
-		} else {
-			toShutDown.shutdown();
-			toShutDown.awaitShutdown();
-		}
-	}
-	private void assertRecord(String topicName, String expectedKey, String expectedValue) {
-		try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
-			kafkaConsumer.subscribe(Collections.singletonList(topicName));
-			ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
-			ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
-			assertEquals(expectedKey, record.key());
-			assertEquals(expectedValue, record.value());
-		}
-	}
-	private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
-		try {
-			autoCloseable.close();
-		}
-		catch (Exception ex) {
-			if (!(ex.getCause() instanceof ProducerFencedException)) {
-				throw ex;
-			}
-		}
-	}
-	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
-		return createTestHarness(topic, 1, 1, 0);
-	}
-	private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(
-		String topic,
-		int maxParallelism,
-		int parallelism,
-		int subtaskIndex) throws Exception {
-		Properties properties = createProperties();
-		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
-			topic,
-			integerKeyedSerializationSchema,
-			properties,
-			FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
-		return new OneInputStreamOperatorTestHarness<>(
-			new StreamSink<>(kafkaProducer),
-			maxParallelism,
-			parallelism,
-			subtaskIndex,
-			IntSerializer.INSTANCE);
-	}
-	private Properties createProperties() {
-		Properties properties = new Properties();
-		properties.putAll(standardProps);
-		properties.putAll(secureProps);
-		properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
-		return properties;
-	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/
new file mode 100644
index 0000000..ab26f8b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/
@@ -0,0 +1,114 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+import static org.junit.Assert.assertEquals;
+ * Tests for our own {@link FlinkKafkaProducer}.
+ */
+public class FlinkKafkaProducerTest extends KafkaTestBase {
+	protected String transactionalId;
+	protected Properties extraProperties;
+	@Before
+	public void before() {
+		transactionalId = UUID.randomUUID().toString();
+		extraProperties = new Properties();
+		extraProperties.putAll(standardProps);
+		extraProperties.put("", transactionalId);
+		extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+		extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+		extraProperties.put("isolation.level", "read_committed");
+	}
+	@Test(timeout = 30000L)
+	public void testHappyPath() throws IOException {
+		String topicName = "flink-kafka-producer-happy-path";
+		try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
+			kafkaProducer.initTransactions();
+			kafkaProducer.beginTransaction();
+			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+			kafkaProducer.commitTransaction();
+		}
+		assertRecord(topicName, "42", "42");
+		deleteTestTopic(topicName);
+	}
+	@Test(timeout = 30000L)
+	public void testResumeTransaction() throws IOException {
+		String topicName = "flink-kafka-producer-resume-transaction";
+		try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
+			kafkaProducer.initTransactions();
+			kafkaProducer.beginTransaction();
+			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+			kafkaProducer.flush();
+			long producerId = kafkaProducer.getProducerId();
+			short epoch = kafkaProducer.getEpoch();
+			try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
+				resumeProducer.resumeTransaction(producerId, epoch);
+				resumeProducer.commitTransaction();
+			}
+			assertRecord(topicName, "42", "42");
+			// this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction
+			kafkaProducer.commitTransaction();
+			// this shouldn't fail also, for same reason as above
+			try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
+				resumeProducer.resumeTransaction(producerId, epoch);
+				resumeProducer.commitTransaction();
+			}
+		}
+		deleteTestTopic(topicName);
+	}
+	private void assertRecord(String topicName, String expectedKey, String expectedValue) {
+		try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
+			kafkaConsumer.subscribe(Collections.singletonList(topicName));
+			ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
+			ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
+			assertEquals(expectedKey, record.key());
+			assertEquals(expectedValue, record.value());
+		}
+	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/
deleted file mode 100644
index 18bbd8f..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/
+++ /dev/null
@@ -1,114 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.Before;
-import org.junit.Test;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.UUID;
-import static org.junit.Assert.assertEquals;
- * Tests for our own {@link FlinkKafkaProducer}.
- */
-public class FlinkKafkaProducerTests extends KafkaTestBase {
-	protected String transactionalId;
-	protected Properties extraProperties;
-	@Before
-	public void before() {
-		transactionalId = UUID.randomUUID().toString();
-		extraProperties = new Properties();
-		extraProperties.putAll(standardProps);
-		extraProperties.put("", transactionalId);
-		extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-		extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-		extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-		extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-		extraProperties.put("isolation.level", "read_committed");
-	}
-	@Test(timeout = 30000L)
-	public void testHappyPath() throws IOException {
-		String topicName = "flink-kafka-producer-happy-path";
-		try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
-			kafkaProducer.initTransactions();
-			kafkaProducer.beginTransaction();
-			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
-			kafkaProducer.commitTransaction();
-		}
-		assertRecord(topicName, "42", "42");
-		deleteTestTopic(topicName);
-	}
-	@Test(timeout = 30000L)
-	public void testResumeTransaction() throws IOException {
-		String topicName = "flink-kafka-producer-resume-transaction";
-		try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
-			kafkaProducer.initTransactions();
-			kafkaProducer.beginTransaction();
-			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
-			kafkaProducer.flush();
-			long producerId = kafkaProducer.getProducerId();
-			short epoch = kafkaProducer.getEpoch();
-			try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
-				resumeProducer.resumeTransaction(producerId, epoch);
-				resumeProducer.commitTransaction();
-			}
-			assertRecord(topicName, "42", "42");
-			// this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction
-			kafkaProducer.commitTransaction();
-			// this shouldn't fail also, for same reason as above
-			try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
-				resumeProducer.resumeTransaction(producerId, epoch);
-				resumeProducer.commitTransaction();
-			}
-		}
-		deleteTestTopic(topicName);
-	}
-	private void assertRecord(String topicName, String expectedKey, String expectedValue) {
-		try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
-			kafkaConsumer.subscribe(Collections.singletonList(topicName));
-			ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
-			ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
-			assertEquals(expectedKey, record.key());
-			assertEquals(expectedValue, record.value());
-		}
-	}