You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/11/02 10:00:00 UTC

[02/11] flink git commit: [hotfix] [kafka-tests] Clean up FlinkKafkaProducer011Tests

[hotfix] [kafka-tests] Clean up FlinkKafkaProducer011Tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7d3589e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7d3589e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7d3589e

Branch: refs/heads/master
Commit: b7d3589e7732b6458ac4c0ad936666d670cac87b
Parents: 8cdf2ff
Author: gyao <ga...@data-artisans.com>
Authored: Thu Oct 26 19:25:35 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 2 12:32:28 2017 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducer011Tests.java       | 49 +-------------------
 1 file changed, 1 insertion(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7d3589e/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
index 69c3ceb..381ba33 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -36,13 +35,10 @@ import kafka.server.KafkaServer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -68,7 +64,7 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase {
 	protected TypeInformationSerializationSchema<Integer> integerSerializationSchema =
 			new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
 	protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema =
-			new KeyedSerializationSchemaWrapper(integerSerializationSchema);
+			new KeyedSerializationSchemaWrapper<>(integerSerializationSchema);
 
 	@Before
 	public void before() {
@@ -83,49 +79,6 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase {
 		extraProperties.put("isolation.level", "read_committed");
 	}
 
-	@Test(timeout = 30000L)
-	public void testHappyPath() throws IOException {
-		String topicName = "flink-kafka-producer-happy-path";
-		try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
-			kafkaProducer.initTransactions();
-			kafkaProducer.beginTransaction();
-			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
-			kafkaProducer.commitTransaction();
-		}
-		assertRecord(topicName, "42", "42");
-		deleteTestTopic(topicName);
-	}
-
-	@Test(timeout = 30000L)
-	public void testResumeTransaction() throws IOException {
-		String topicName = "flink-kafka-producer-resume-transaction";
-		try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
-			kafkaProducer.initTransactions();
-			kafkaProducer.beginTransaction();
-			kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
-			kafkaProducer.flush();
-			long producerId = kafkaProducer.getProducerId();
-			short epoch = kafkaProducer.getEpoch();
-
-			try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
-				resumeProducer.resumeTransaction(producerId, epoch);
-				resumeProducer.commitTransaction();
-			}
-
-			assertRecord(topicName, "42", "42");
-
-			// this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction
-			kafkaProducer.commitTransaction();
-
-			// this shouldn't fail also, for same reason as above
-			try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
-				resumeProducer.resumeTransaction(producerId, epoch);
-				resumeProducer.commitTransaction();
-			}
-		}
-		deleteTestTopic(topicName);
-	}
-
 	@Test(timeout = 120_000L)
 	public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception {
 		String topic = "flink-kafka-producer-fail-before-notify";