You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/11/02 10:00:05 UTC
[07/11] flink git commit: [hotfix][kafka-tests] Fix test names so
that they are not ignored by mvn build
[hotfix][kafka-tests] Fix test names so that they are not ignored by mvn build
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/856b6baf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/856b6baf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/856b6baf
Branch: refs/heads/master
Commit: 856b6baf1672ac0a9eaedc56cb18562e934ebac3
Parents: 872c35e
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Fri Oct 27 15:11:24 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 2 12:43:20 2017 +0800
----------------------------------------------------------------------
.../kafka/FlinkKafkaProducer011Test.java | 519 +++++++++++++++++++
.../kafka/FlinkKafkaProducer011Tests.java | 519 -------------------
.../kafka/FlinkKafkaProducerTest.java | 114 ++++
.../kafka/FlinkKafkaProducerTests.java | 114 ----
4 files changed, 633 insertions(+), 633 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/856b6baf/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java
new file mode 100644
index 0000000..1b87ff7
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import kafka.server.KafkaServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer011}.
+ */
+@SuppressWarnings("serial")
+public class FlinkKafkaProducer011Test extends KafkaTestBase {
+ protected String transactionalId;
+ protected Properties extraProperties;
+
+ protected TypeInformationSerializationSchema<Integer> integerSerializationSchema =
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+ protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema =
+ new KeyedSerializationSchemaWrapper<>(integerSerializationSchema);
+
+ @Before
+ public void before() {
+ transactionalId = UUID.randomUUID().toString();
+ extraProperties = new Properties();
+ extraProperties.putAll(standardProps);
+ extraProperties.put("transactional.id", transactionalId);
+ extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ extraProperties.put("isolation.level", "read_committed");
+ }
+
+ @Test(timeout = 120_000L)
+ public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception {
+ String topic = "flink-kafka-producer-fail-before-notify";
+
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
+
+ testHarness.setup();
+ testHarness.open();
+ testHarness.initializeState(null);
+ testHarness.processElement(42, 0);
+ testHarness.snapshot(0, 1);
+ testHarness.processElement(43, 2);
+ OperatorStateHandles snapshot = testHarness.snapshot(1, 3);
+
+ int leaderId = kafkaServer.getLeaderToShutDown(topic);
+ failBroker(leaderId);
+
+ try {
+ testHarness.processElement(44, 4);
+ testHarness.snapshot(2, 5);
+ assertFalse(true);
+ }
+ catch (Exception ex) {
+ // expected
+ }
+ try {
+ testHarness.close();
+ }
+ catch (Exception ex) {
+ }
+
+ kafkaServer.restartBroker(leaderId);
+
+ testHarness = createTestHarness(topic);
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.close();
+
+ assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L);
+
+ deleteTestTopic(topic);
+ }
+
+ @Test(timeout = 120_000L)
+ public void testFlinkKafkaProducer011FailTransactionCoordinatorBeforeNotify() throws Exception {
+ String topic = "flink-kafka-producer-fail-transaction-coordinator-before-notify";
+
+ Properties properties = createProperties();
+
+ FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+ topic,
+ integerKeyedSerializationSchema,
+ properties,
+ FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(kafkaProducer),
+ IntSerializer.INSTANCE);
+
+ testHarness1.setup();
+ testHarness1.open();
+ testHarness1.initializeState(null);
+ testHarness1.processElement(42, 0);
+ testHarness1.snapshot(0, 1);
+ testHarness1.processElement(43, 2);
+ int transactionCoordinatorId = kafkaProducer.getTransactionCoordinatorId();
+ OperatorStateHandles snapshot = testHarness1.snapshot(1, 3);
+
+ failBroker(transactionCoordinatorId);
+
+ try {
+ testHarness1.processElement(44, 4);
+ testHarness1.notifyOfCompletedCheckpoint(1);
+ testHarness1.close();
+ }
+ catch (Exception ex) {
+ // Expected... some random exception could be thrown by any of the above operations.
+ }
+ finally {
+ kafkaServer.restartBroker(transactionCoordinatorId);
+ }
+
+ try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 = createTestHarness(topic)) {
+ testHarness2.setup();
+ testHarness2.initializeState(snapshot);
+ testHarness2.open();
+ }
+
+ assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L);
+
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure.
+ * If such transactions were left alone lingering it consumers would be unable to read committed records
+ * that were created after this lingering transaction.
+ */
+ @Test(timeout = 120_000L)
+ public void testFailBeforeNotifyAndResumeWorkAfterwards() throws Exception {
+ String topic = "flink-kafka-producer-fail-before-notify";
+
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
+
+ testHarness.setup();
+ testHarness.open();
+ testHarness.processElement(42, 0);
+ testHarness.snapshot(0, 1);
+ testHarness.processElement(43, 2);
+ OperatorStateHandles snapshot1 = testHarness.snapshot(1, 3);
+
+ testHarness.processElement(44, 4);
+ testHarness.snapshot(2, 5);
+ testHarness.processElement(45, 6);
+
+ // do not close previous testHarness to make sure that closing do not clean up something (in case of failure
+ // there might not be any close)
+ testHarness = createTestHarness(topic);
+ testHarness.setup();
+ // restore from snapshot1, transactions with records 44 and 45 should be aborted
+ testHarness.initializeState(snapshot1);
+ testHarness.open();
+
+ // write and commit more records, after potentially lingering transactions
+ testHarness.processElement(46, 7);
+ testHarness.snapshot(4, 8);
+ testHarness.processElement(47, 9);
+ testHarness.notifyOfCompletedCheckpoint(4);
+
+ //now we should have:
+ // - records 42 and 43 in committed transactions
+ // - aborted transactions with records 44 and 45
+ // - committed transaction with record 46
+ // - pending transaction with record 47
+ assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 46), 30_000L);
+
+ testHarness.close();
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure,
+ * which happened before first checkpoint and was followed up by reducing the parallelism.
+ * If such transactions were left alone lingering it consumers would be unable to read committed records
+ * that were created after this lingering transaction.
+ */
+ @Test(timeout = 120_000L)
+ public void testScaleDownBeforeFirstCheckpoint() throws Exception {
+ String topic = "scale-down-before-first-checkpoint";
+
+ List<AutoCloseable> operatorsToClose = new ArrayList<>();
+ int preScaleDownParallelism = Math.max(2, FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR);
+ for (int subtaskIndex = 0; subtaskIndex < preScaleDownParallelism; subtaskIndex++) {
+ OneInputStreamOperatorTestHarness<Integer, Object> preScaleDownOperator = createTestHarness(
+ topic,
+ preScaleDownParallelism,
+ preScaleDownParallelism,
+ subtaskIndex);
+
+ preScaleDownOperator.setup();
+ preScaleDownOperator.open();
+ preScaleDownOperator.processElement(subtaskIndex * 2, 0);
+ preScaleDownOperator.snapshot(0, 1);
+ preScaleDownOperator.processElement(subtaskIndex * 2 + 1, 2);
+
+ operatorsToClose.add(preScaleDownOperator);
+ }
+
+ // do not close previous testHarnesses to make sure that closing do not clean up something (in case of failure
+ // there might not be any close)
+
+ // After previous failure simulate restarting application with smaller parallelism
+ OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0);
+
+ postScaleDownOperator1.setup();
+ postScaleDownOperator1.open();
+
+ // write and commit more records, after potentially lingering transactions
+ postScaleDownOperator1.processElement(46, 7);
+ postScaleDownOperator1.snapshot(4, 8);
+ postScaleDownOperator1.processElement(47, 9);
+ postScaleDownOperator1.notifyOfCompletedCheckpoint(4);
+
+ //now we should have:
+ // - records 42, 43, 44 and 45 in aborted transactions
+ // - committed transaction with record 46
+ // - pending transaction with record 47
+ assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(46), 30_000L);
+
+ postScaleDownOperator1.close();
+ // ignore ProducerFencedExceptions, because postScaleDownOperator1 could reuse transactional ids.
+ for (AutoCloseable operatorToClose : operatorsToClose) {
+ closeIgnoringProducerFenced(operatorToClose);
+ }
+ deleteTestTopic(topic);
+ }
+
+ /**
+ * Each instance of FlinkKafkaProducer011 uses it's own pool of transactional ids. After the restore from checkpoint
+ * transactional ids are redistributed across the subtasks. In case of scale down, the surplus transactional ids
+ * are dropped. In case of scale up, new one are generated (for the new subtasks). This test make sure that sequence
+ * of scaling down and up again works fine. Especially it checks whether the newly generated ids in scaling up
+ * do not overlap with ids that were used before scaling down. For example we start with 4 ids and parallelism 4:
+ * [1], [2], [3], [4] - one assigned per each subtask
+ * we scale down to parallelism 2:
+ * [1, 2], [3, 4] - first subtask got id 1 and 2, second got ids 3 and 4
+ * surplus ids are dropped from the pools and we scale up to parallelism 3:
+ * [1 or 2], [3 or 4], [???]
+ * new subtask have to generate new id(s), but he can not use ids that are potentially in use, so it has to generate
+ * new ones that are greater then 4.
+ */
+ @Test(timeout = 120_000L)
+ public void testScaleUpAfterScalingDown() throws Exception {
+ String topic = "scale-down-before-first-checkpoint";
+
+ final int parallelism1 = 4;
+ final int parallelism2 = 2;
+ final int parallelism3 = 3;
+ final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3));
+
+ List<OperatorStateHandle> operatorStateHandles = repartitionAndExecute(
+ topic,
+ Collections.emptyList(),
+ parallelism1,
+ maxParallelism,
+ IntStream.range(0, parallelism1).boxed().iterator());
+
+ operatorStateHandles = repartitionAndExecute(
+ topic,
+ operatorStateHandles,
+ parallelism2,
+ maxParallelism,
+ IntStream.range(parallelism1, parallelism1 + parallelism2).boxed().iterator());
+
+ operatorStateHandles = repartitionAndExecute(
+ topic,
+ operatorStateHandles,
+ parallelism3,
+ maxParallelism,
+ IntStream.range(parallelism1 + parallelism2, parallelism1 + parallelism2 + parallelism3).boxed().iterator());
+
+ // After each previous repartitionAndExecute call, we are left with some lingering transactions, that would
+ // not allow us to read all committed messages from the topic. Thus we initialize operators from
+ // operatorStateHandles once more, but without any new data. This should terminate all ongoing transactions.
+
+ operatorStateHandles = repartitionAndExecute(
+ topic,
+ operatorStateHandles,
+ 1,
+ maxParallelism,
+ Collections.emptyIterator());
+
+ assertExactlyOnceForTopic(
+ createProperties(),
+ topic,
+ 0,
+ IntStream.range(0, parallelism1 + parallelism2 + parallelism3).boxed().collect(Collectors.toList()),
+ 30_000L);
+ deleteTestTopic(topic);
+ }
+
+ private List<OperatorStateHandle> repartitionAndExecute(
+ String topic,
+ List<OperatorStateHandle> inputStates,
+ int parallelism,
+ int maxParallelism,
+ Iterator<Integer> inputData) throws Exception {
+
+ List<OperatorStateHandle> outputStates = new ArrayList<>();
+ List<OneInputStreamOperatorTestHarness<Integer, Object>> testHarnesses = new ArrayList<>();
+
+ for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) {
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
+ createTestHarness(topic, maxParallelism, parallelism, subtaskIndex);
+ testHarnesses.add(testHarness);
+
+ testHarness.setup();
+
+ testHarness.initializeState(new OperatorStateHandles(
+ 0,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ inputStates,
+ Collections.emptyList()));
+ testHarness.open();
+
+ if (inputData.hasNext()) {
+ int nextValue = inputData.next();
+ testHarness.processElement(nextValue, 0);
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+ outputStates.addAll(snapshot.getManagedOperatorState());
+ checkState(snapshot.getRawOperatorState() == null, "Unexpected raw operator state");
+ checkState(snapshot.getManagedKeyedState() == null, "Unexpected managed keyed state");
+ checkState(snapshot.getRawKeyedState() == null, "Unexpected raw keyed state");
+
+ for (int i = 1; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) {
+ testHarness.processElement(-nextValue, 0);
+ testHarness.snapshot(i, 0);
+ }
+ }
+ }
+
+ for (OneInputStreamOperatorTestHarness<Integer, Object> testHarness : testHarnesses) {
+ testHarness.close();
+ }
+
+ return outputStates;
+ }
+
+ @Test
+ public void testRecoverCommittedTransaction() throws Exception {
+ String topic = "flink-kafka-producer-recover-committed-transaction";
+
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
+
+ testHarness.setup();
+ testHarness.open(); // producerA - start transaction (txn) 0
+ testHarness.processElement(42, 0); // producerA - write 42 in txn 0
+ OperatorStateHandles checkpoint0 = testHarness.snapshot(0, 1); // producerA - pre commit txn 0, producerB - start txn 1
+ testHarness.processElement(43, 2); // producerB - write 43 in txn 1
+ testHarness.notifyOfCompletedCheckpoint(0); // producerA - commit txn 0 and return to the pool
+ testHarness.snapshot(1, 3); // producerB - pre txn 1, producerA - start txn 2
+ testHarness.processElement(44, 4); // producerA - write 44 in txn 2
+ testHarness.close(); // producerA - abort txn 2
+
+ testHarness = createTestHarness(topic);
+ testHarness.initializeState(checkpoint0); // recover state 0 - producerA recover and commit txn 0
+ testHarness.close();
+
+ assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42), 30_000L);
+
+ deleteTestTopic(topic);
+ }
+
+ @Test
+ public void testRunOutOfProducersInThePool() throws Exception {
+ String topic = "flink-kafka-run-out-of-producers";
+
+ try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ for (int i = 0; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE * 2; i++) {
+ testHarness.processElement(i, i * 2);
+ testHarness.snapshot(i, i * 2 + 1);
+ }
+ }
+ catch (Exception ex) {
+ if (!ex.getCause().getMessage().startsWith("Too many ongoing")) {
+ throw ex;
+ }
+ }
+ deleteTestTopic(topic);
+ }
+
+ // shut down a Kafka broker
+ private void failBroker(int brokerId) {
+ KafkaServer toShutDown = null;
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+
+ if (kafkaServer.getBrokerId(server) == brokerId) {
+ toShutDown = server;
+ break;
+ }
+ }
+
+ if (toShutDown == null) {
+ StringBuilder listOfBrokers = new StringBuilder();
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+ listOfBrokers.append(kafkaServer.getBrokerId(server));
+ listOfBrokers.append(" ; ");
+ }
+
+ throw new IllegalArgumentException("Cannot find broker to shut down: " + brokerId
+ + " ; available brokers: " + listOfBrokers.toString());
+ } else {
+ toShutDown.shutdown();
+ toShutDown.awaitShutdown();
+ }
+ }
+
+ private void assertRecord(String topicName, String expectedKey, String expectedValue) {
+ try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
+ kafkaConsumer.subscribe(Collections.singletonList(topicName));
+ ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
+
+ ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
+ assertEquals(expectedKey, record.key());
+ assertEquals(expectedValue, record.value());
+ }
+ }
+
+ private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
+ try {
+ autoCloseable.close();
+ }
+ catch (Exception ex) {
+ if (!(ex.getCause() instanceof ProducerFencedException)) {
+ throw ex;
+ }
+ }
+ }
+
+ private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
+ return createTestHarness(topic, 1, 1, 0);
+ }
+
+ private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(
+ String topic,
+ int maxParallelism,
+ int parallelism,
+ int subtaskIndex) throws Exception {
+ Properties properties = createProperties();
+
+ FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+ topic,
+ integerKeyedSerializationSchema,
+ properties,
+ FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+
+ return new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(kafkaProducer),
+ maxParallelism,
+ parallelism,
+ subtaskIndex,
+ IntSerializer.INSTANCE);
+ }
+
+ private Properties createProperties() {
+ Properties properties = new Properties();
+ properties.putAll(standardProps);
+ properties.putAll(secureProps);
+ properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
+ return properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/856b6baf/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
deleted file mode 100644
index 381ba33..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
-
-import kafka.server.KafkaServer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.errors.ProducerFencedException;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-/**
- * IT cases for the {@link FlinkKafkaProducer011}.
- */
-@SuppressWarnings("serial")
-public class FlinkKafkaProducer011Tests extends KafkaTestBase {
- protected String transactionalId;
- protected Properties extraProperties;
-
- protected TypeInformationSerializationSchema<Integer> integerSerializationSchema =
- new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
- protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema =
- new KeyedSerializationSchemaWrapper<>(integerSerializationSchema);
-
- @Before
- public void before() {
- transactionalId = UUID.randomUUID().toString();
- extraProperties = new Properties();
- extraProperties.putAll(standardProps);
- extraProperties.put("transactional.id", transactionalId);
- extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- extraProperties.put("isolation.level", "read_committed");
- }
-
- @Test(timeout = 120_000L)
- public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception {
- String topic = "flink-kafka-producer-fail-before-notify";
-
- OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
-
- testHarness.setup();
- testHarness.open();
- testHarness.initializeState(null);
- testHarness.processElement(42, 0);
- testHarness.snapshot(0, 1);
- testHarness.processElement(43, 2);
- OperatorStateHandles snapshot = testHarness.snapshot(1, 3);
-
- int leaderId = kafkaServer.getLeaderToShutDown(topic);
- failBroker(leaderId);
-
- try {
- testHarness.processElement(44, 4);
- testHarness.snapshot(2, 5);
- assertFalse(true);
- }
- catch (Exception ex) {
- // expected
- }
- try {
- testHarness.close();
- }
- catch (Exception ex) {
- }
-
- kafkaServer.restartBroker(leaderId);
-
- testHarness = createTestHarness(topic);
- testHarness.setup();
- testHarness.initializeState(snapshot);
- testHarness.close();
-
- assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L);
-
- deleteTestTopic(topic);
- }
-
- @Test(timeout = 120_000L)
- public void testFlinkKafkaProducer011FailTransactionCoordinatorBeforeNotify() throws Exception {
- String topic = "flink-kafka-producer-fail-transaction-coordinator-before-notify";
-
- Properties properties = createProperties();
-
- FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
- topic,
- integerKeyedSerializationSchema,
- properties,
- FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
-
- OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 = new OneInputStreamOperatorTestHarness<>(
- new StreamSink<>(kafkaProducer),
- IntSerializer.INSTANCE);
-
- testHarness1.setup();
- testHarness1.open();
- testHarness1.initializeState(null);
- testHarness1.processElement(42, 0);
- testHarness1.snapshot(0, 1);
- testHarness1.processElement(43, 2);
- int transactionCoordinatorId = kafkaProducer.getTransactionCoordinatorId();
- OperatorStateHandles snapshot = testHarness1.snapshot(1, 3);
-
- failBroker(transactionCoordinatorId);
-
- try {
- testHarness1.processElement(44, 4);
- testHarness1.notifyOfCompletedCheckpoint(1);
- testHarness1.close();
- }
- catch (Exception ex) {
- // Expected... some random exception could be thrown by any of the above operations.
- }
- finally {
- kafkaServer.restartBroker(transactionCoordinatorId);
- }
-
- try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 = createTestHarness(topic)) {
- testHarness2.setup();
- testHarness2.initializeState(snapshot);
- testHarness2.open();
- }
-
- assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43), 30_000L);
-
- deleteTestTopic(topic);
- }
-
- /**
- * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure.
- * If such transactions were left alone lingering it consumers would be unable to read committed records
- * that were created after this lingering transaction.
- */
- @Test(timeout = 120_000L)
- public void testFailBeforeNotifyAndResumeWorkAfterwards() throws Exception {
- String topic = "flink-kafka-producer-fail-before-notify";
-
- OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
-
- testHarness.setup();
- testHarness.open();
- testHarness.processElement(42, 0);
- testHarness.snapshot(0, 1);
- testHarness.processElement(43, 2);
- OperatorStateHandles snapshot1 = testHarness.snapshot(1, 3);
-
- testHarness.processElement(44, 4);
- testHarness.snapshot(2, 5);
- testHarness.processElement(45, 6);
-
- // do not close previous testHarness to make sure that closing do not clean up something (in case of failure
- // there might not be any close)
- testHarness = createTestHarness(topic);
- testHarness.setup();
- // restore from snapshot1, transactions with records 44 and 45 should be aborted
- testHarness.initializeState(snapshot1);
- testHarness.open();
-
- // write and commit more records, after potentially lingering transactions
- testHarness.processElement(46, 7);
- testHarness.snapshot(4, 8);
- testHarness.processElement(47, 9);
- testHarness.notifyOfCompletedCheckpoint(4);
-
- //now we should have:
- // - records 42 and 43 in committed transactions
- // - aborted transactions with records 44 and 45
- // - committed transaction with record 46
- // - pending transaction with record 47
- assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 46), 30_000L);
-
- testHarness.close();
- deleteTestTopic(topic);
- }
-
- /**
- * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure,
- * which happened before first checkpoint and was followed up by reducing the parallelism.
- * If such transactions were left alone lingering it consumers would be unable to read committed records
- * that were created after this lingering transaction.
- */
- @Test(timeout = 120_000L)
- public void testScaleDownBeforeFirstCheckpoint() throws Exception {
- String topic = "scale-down-before-first-checkpoint";
-
- List<AutoCloseable> operatorsToClose = new ArrayList<>();
- int preScaleDownParallelism = Math.max(2, FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR);
- for (int subtaskIndex = 0; subtaskIndex < preScaleDownParallelism; subtaskIndex++) {
- OneInputStreamOperatorTestHarness<Integer, Object> preScaleDownOperator = createTestHarness(
- topic,
- preScaleDownParallelism,
- preScaleDownParallelism,
- subtaskIndex);
-
- preScaleDownOperator.setup();
- preScaleDownOperator.open();
- preScaleDownOperator.processElement(subtaskIndex * 2, 0);
- preScaleDownOperator.snapshot(0, 1);
- preScaleDownOperator.processElement(subtaskIndex * 2 + 1, 2);
-
- operatorsToClose.add(preScaleDownOperator);
- }
-
- // do not close previous testHarnesses to make sure that closing do not clean up something (in case of failure
- // there might not be any close)
-
- // After previous failure simulate restarting application with smaller parallelism
- OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0);
-
- postScaleDownOperator1.setup();
- postScaleDownOperator1.open();
-
- // write and commit more records, after potentially lingering transactions
- postScaleDownOperator1.processElement(46, 7);
- postScaleDownOperator1.snapshot(4, 8);
- postScaleDownOperator1.processElement(47, 9);
- postScaleDownOperator1.notifyOfCompletedCheckpoint(4);
-
- //now we should have:
- // - records 42, 43, 44 and 45 in aborted transactions
- // - committed transaction with record 46
- // - pending transaction with record 47
- assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(46), 30_000L);
-
- postScaleDownOperator1.close();
- // ignore ProducerFencedExceptions, because postScaleDownOperator1 could reuse transactional ids.
- for (AutoCloseable operatorToClose : operatorsToClose) {
- closeIgnoringProducerFenced(operatorToClose);
- }
- deleteTestTopic(topic);
- }
-
- /**
- * Each instance of FlinkKafkaProducer011 uses it's own pool of transactional ids. After the restore from checkpoint
- * transactional ids are redistributed across the subtasks. In case of scale down, the surplus transactional ids
- * are dropped. In case of scale up, new one are generated (for the new subtasks). This test make sure that sequence
- * of scaling down and up again works fine. Especially it checks whether the newly generated ids in scaling up
- * do not overlap with ids that were used before scaling down. For example we start with 4 ids and parallelism 4:
- * [1], [2], [3], [4] - one assigned per each subtask
- * we scale down to parallelism 2:
- * [1, 2], [3, 4] - first subtask got id 1 and 2, second got ids 3 and 4
- * surplus ids are dropped from the pools and we scale up to parallelism 3:
- * [1 or 2], [3 or 4], [???]
- * new subtask have to generate new id(s), but he can not use ids that are potentially in use, so it has to generate
- * new ones that are greater then 4.
- */
- @Test(timeout = 120_000L)
- public void testScaleUpAfterScalingDown() throws Exception {
- String topic = "scale-down-before-first-checkpoint";
-
- final int parallelism1 = 4;
- final int parallelism2 = 2;
- final int parallelism3 = 3;
- final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, parallelism3));
-
- List<OperatorStateHandle> operatorStateHandles = repartitionAndExecute(
- topic,
- Collections.emptyList(),
- parallelism1,
- maxParallelism,
- IntStream.range(0, parallelism1).boxed().iterator());
-
- operatorStateHandles = repartitionAndExecute(
- topic,
- operatorStateHandles,
- parallelism2,
- maxParallelism,
- IntStream.range(parallelism1, parallelism1 + parallelism2).boxed().iterator());
-
- operatorStateHandles = repartitionAndExecute(
- topic,
- operatorStateHandles,
- parallelism3,
- maxParallelism,
- IntStream.range(parallelism1 + parallelism2, parallelism1 + parallelism2 + parallelism3).boxed().iterator());
-
- // After each previous repartitionAndExecute call, we are left with some lingering transactions, that would
- // not allow us to read all committed messages from the topic. Thus we initialize operators from
- // operatorStateHandles once more, but without any new data. This should terminate all ongoing transactions.
-
- operatorStateHandles = repartitionAndExecute(
- topic,
- operatorStateHandles,
- 1,
- maxParallelism,
- Collections.emptyIterator());
-
- assertExactlyOnceForTopic(
- createProperties(),
- topic,
- 0,
- IntStream.range(0, parallelism1 + parallelism2 + parallelism3).boxed().collect(Collectors.toList()),
- 30_000L);
- deleteTestTopic(topic);
- }
-
- private List<OperatorStateHandle> repartitionAndExecute(
- String topic,
- List<OperatorStateHandle> inputStates,
- int parallelism,
- int maxParallelism,
- Iterator<Integer> inputData) throws Exception {
-
- List<OperatorStateHandle> outputStates = new ArrayList<>();
- List<OneInputStreamOperatorTestHarness<Integer, Object>> testHarnesses = new ArrayList<>();
-
- for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) {
- OneInputStreamOperatorTestHarness<Integer, Object> testHarness =
- createTestHarness(topic, maxParallelism, parallelism, subtaskIndex);
- testHarnesses.add(testHarness);
-
- testHarness.setup();
-
- testHarness.initializeState(new OperatorStateHandles(
- 0,
- Collections.emptyList(),
- Collections.emptyList(),
- inputStates,
- Collections.emptyList()));
- testHarness.open();
-
- if (inputData.hasNext()) {
- int nextValue = inputData.next();
- testHarness.processElement(nextValue, 0);
- OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-
- outputStates.addAll(snapshot.getManagedOperatorState());
- checkState(snapshot.getRawOperatorState() == null, "Unexpected raw operator state");
- checkState(snapshot.getManagedKeyedState() == null, "Unexpected managed keyed state");
- checkState(snapshot.getRawKeyedState() == null, "Unexpected raw keyed state");
-
- for (int i = 1; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) {
- testHarness.processElement(-nextValue, 0);
- testHarness.snapshot(i, 0);
- }
- }
- }
-
- for (OneInputStreamOperatorTestHarness<Integer, Object> testHarness : testHarnesses) {
- testHarness.close();
- }
-
- return outputStates;
- }
-
- @Test
- public void testRecoverCommittedTransaction() throws Exception {
- String topic = "flink-kafka-producer-recover-committed-transaction";
-
- OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic);
-
- testHarness.setup();
- testHarness.open(); // producerA - start transaction (txn) 0
- testHarness.processElement(42, 0); // producerA - write 42 in txn 0
- OperatorStateHandles checkpoint0 = testHarness.snapshot(0, 1); // producerA - pre commit txn 0, producerB - start txn 1
- testHarness.processElement(43, 2); // producerB - write 43 in txn 1
- testHarness.notifyOfCompletedCheckpoint(0); // producerA - commit txn 0 and return to the pool
- testHarness.snapshot(1, 3); // producerB - pre txn 1, producerA - start txn 2
- testHarness.processElement(44, 4); // producerA - write 44 in txn 2
- testHarness.close(); // producerA - abort txn 2
-
- testHarness = createTestHarness(topic);
- testHarness.initializeState(checkpoint0); // recover state 0 - producerA recover and commit txn 0
- testHarness.close();
-
- assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42), 30_000L);
-
- deleteTestTopic(topic);
- }
-
- @Test
- public void testRunOutOfProducersInThePool() throws Exception {
- String topic = "flink-kafka-run-out-of-producers";
-
- try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) {
-
- testHarness.setup();
- testHarness.open();
-
- for (int i = 0; i < FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE * 2; i++) {
- testHarness.processElement(i, i * 2);
- testHarness.snapshot(i, i * 2 + 1);
- }
- }
- catch (Exception ex) {
- if (!ex.getCause().getMessage().startsWith("Too many ongoing")) {
- throw ex;
- }
- }
- deleteTestTopic(topic);
- }
-
- // shut down a Kafka broker
- private void failBroker(int brokerId) {
- KafkaServer toShutDown = null;
- for (KafkaServer server : kafkaServer.getBrokers()) {
-
- if (kafkaServer.getBrokerId(server) == brokerId) {
- toShutDown = server;
- break;
- }
- }
-
- if (toShutDown == null) {
- StringBuilder listOfBrokers = new StringBuilder();
- for (KafkaServer server : kafkaServer.getBrokers()) {
- listOfBrokers.append(kafkaServer.getBrokerId(server));
- listOfBrokers.append(" ; ");
- }
-
- throw new IllegalArgumentException("Cannot find broker to shut down: " + brokerId
- + " ; available brokers: " + listOfBrokers.toString());
- } else {
- toShutDown.shutdown();
- toShutDown.awaitShutdown();
- }
- }
-
- private void assertRecord(String topicName, String expectedKey, String expectedValue) {
- try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
- kafkaConsumer.subscribe(Collections.singletonList(topicName));
- ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
-
- ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
- assertEquals(expectedKey, record.key());
- assertEquals(expectedValue, record.value());
- }
- }
-
- private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception {
- try {
- autoCloseable.close();
- }
- catch (Exception ex) {
- if (!(ex.getCause() instanceof ProducerFencedException)) {
- throw ex;
- }
- }
- }
-
- private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception {
- return createTestHarness(topic, 1, 1, 0);
- }
-
- private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(
- String topic,
- int maxParallelism,
- int parallelism,
- int subtaskIndex) throws Exception {
- Properties properties = createProperties();
-
- FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
- topic,
- integerKeyedSerializationSchema,
- properties,
- FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
-
- return new OneInputStreamOperatorTestHarness<>(
- new StreamSink<>(kafkaProducer),
- maxParallelism,
- parallelism,
- subtaskIndex,
- IntSerializer.INSTANCE);
- }
-
- private Properties createProperties() {
- Properties properties = new Properties();
- properties.putAll(standardProps);
- properties.putAll(secureProps);
- properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true");
- return properties;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/856b6baf/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
new file mode 100644
index 0000000..ab26f8b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for our own {@link FlinkKafkaProducer}.
+ */
+@SuppressWarnings("serial")
+public class FlinkKafkaProducerTest extends KafkaTestBase {
+ protected String transactionalId;
+ protected Properties extraProperties;
+
+ @Before
+ public void before() {
+ transactionalId = UUID.randomUUID().toString();
+ extraProperties = new Properties();
+ extraProperties.putAll(standardProps);
+ extraProperties.put("transactional.id", transactionalId);
+ extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ extraProperties.put("isolation.level", "read_committed");
+ }
+
+ @Test(timeout = 30000L)
+ public void testHappyPath() throws IOException {
+ String topicName = "flink-kafka-producer-happy-path";
+ try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+ kafkaProducer.commitTransaction();
+ }
+ assertRecord(topicName, "42", "42");
+ deleteTestTopic(topicName);
+ }
+
+ @Test(timeout = 30000L)
+ public void testResumeTransaction() throws IOException {
+ String topicName = "flink-kafka-producer-resume-transaction";
+ try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
+ kafkaProducer.flush();
+ long producerId = kafkaProducer.getProducerId();
+ short epoch = kafkaProducer.getEpoch();
+
+ try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
+ resumeProducer.resumeTransaction(producerId, epoch);
+ resumeProducer.commitTransaction();
+ }
+
+ assertRecord(topicName, "42", "42");
+
+ // this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction
+ kafkaProducer.commitTransaction();
+
+ // this shouldn't fail also, for same reason as above
+ try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
+ resumeProducer.resumeTransaction(producerId, epoch);
+ resumeProducer.commitTransaction();
+ }
+ }
+ deleteTestTopic(topicName);
+ }
+
+ private void assertRecord(String topicName, String expectedKey, String expectedValue) {
+ try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
+ kafkaConsumer.subscribe(Collections.singletonList(topicName));
+ ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
+
+ ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
+ assertEquals(expectedKey, record.key());
+ assertEquals(expectedValue, record.value());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/856b6baf/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java
deleted file mode 100644
index 18bbd8f..0000000
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTests.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for our own {@link FlinkKafkaProducer}.
- */
-@SuppressWarnings("serial")
-public class FlinkKafkaProducerTests extends KafkaTestBase {
- protected String transactionalId;
- protected Properties extraProperties;
-
- @Before
- public void before() {
- transactionalId = UUID.randomUUID().toString();
- extraProperties = new Properties();
- extraProperties.putAll(standardProps);
- extraProperties.put("transactional.id", transactionalId);
- extraProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- extraProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- extraProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- extraProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- extraProperties.put("isolation.level", "read_committed");
- }
-
- @Test(timeout = 30000L)
- public void testHappyPath() throws IOException {
- String topicName = "flink-kafka-producer-happy-path";
- try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
- kafkaProducer.initTransactions();
- kafkaProducer.beginTransaction();
- kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
- kafkaProducer.commitTransaction();
- }
- assertRecord(topicName, "42", "42");
- deleteTestTopic(topicName);
- }
-
- @Test(timeout = 30000L)
- public void testResumeTransaction() throws IOException {
- String topicName = "flink-kafka-producer-resume-transaction";
- try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
- kafkaProducer.initTransactions();
- kafkaProducer.beginTransaction();
- kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
- kafkaProducer.flush();
- long producerId = kafkaProducer.getProducerId();
- short epoch = kafkaProducer.getEpoch();
-
- try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
- resumeProducer.resumeTransaction(producerId, epoch);
- resumeProducer.commitTransaction();
- }
-
- assertRecord(topicName, "42", "42");
-
- // this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction
- kafkaProducer.commitTransaction();
-
- // this shouldn't fail also, for same reason as above
- try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
- resumeProducer.resumeTransaction(producerId, epoch);
- resumeProducer.commitTransaction();
- }
- }
- deleteTestTopic(topicName);
- }
-
- private void assertRecord(String topicName, String expectedKey, String expectedValue) {
- try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(extraProperties)) {
- kafkaConsumer.subscribe(Collections.singletonList(topicName));
- ConsumerRecords<String, String> records = kafkaConsumer.poll(10000);
-
- ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
- assertEquals(expectedKey, record.key());
- assertEquals(expectedValue, record.value());
- }
- }
-}