You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/11/02 10:00:00 UTC
[02/11] flink git commit: [hotfix] [kafka-tests] Clean up
FlinkKafkaProducer011Tests
[hotfix] [kafka-tests] Clean up FlinkKafkaProducer011Tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7d3589e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7d3589e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7d3589e
Branch: refs/heads/master
Commit: b7d3589e7732b6458ac4c0ad936666d670cac87b
Parents: 8cdf2ff
Author: gyao <ga...@data-artisans.com>
Authored: Thu Oct 26 19:25:35 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 2 12:32:28 2017 +0800
----------------------------------------------------------------------
.../kafka/FlinkKafkaProducer011Tests.java | 49 +-------------------
1 file changed, 1 insertion(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b7d3589e/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
index 69c3ceb..381ba33 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -36,13 +35,10 @@ import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -68,7 +64,7 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase {
protected TypeInformationSerializationSchema<Integer> integerSerializationSchema =
new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema =
- new KeyedSerializationSchemaWrapper(integerSerializationSchema);
+ new KeyedSerializationSchemaWrapper<>(integerSerializationSchema);
@Before
public void before() {
@@ -83,49 +79,6 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase {
extraProperties.put("isolation.level", "read_committed");
}
- @Test(timeout = 30000L)
- public void testHappyPath() throws IOException {
- String topicName = "flink-kafka-producer-happy-path";
- try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
- kafkaProducer.initTransactions();
- kafkaProducer.beginTransaction();
- kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
- kafkaProducer.commitTransaction();
- }
- assertRecord(topicName, "42", "42");
- deleteTestTopic(topicName);
- }
-
- @Test(timeout = 30000L)
- public void testResumeTransaction() throws IOException {
- String topicName = "flink-kafka-producer-resume-transaction";
- try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
- kafkaProducer.initTransactions();
- kafkaProducer.beginTransaction();
- kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42"));
- kafkaProducer.flush();
- long producerId = kafkaProducer.getProducerId();
- short epoch = kafkaProducer.getEpoch();
-
- try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
- resumeProducer.resumeTransaction(producerId, epoch);
- resumeProducer.commitTransaction();
- }
-
- assertRecord(topicName, "42", "42");
-
- // this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction
- kafkaProducer.commitTransaction();
-
- // this shouldn't fail also, for same reason as above
- try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) {
- resumeProducer.resumeTransaction(producerId, epoch);
- resumeProducer.commitTransaction();
- }
- }
- deleteTestTopic(topicName);
- }
-
@Test(timeout = 120_000L)
public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception {
String topic = "flink-kafka-producer-fail-before-notify";