You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2019/10/29 05:45:55 UTC

[flink] branch release-1.9 updated (7bed1e2 -> 9db9543)

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 7bed1e2  [FLINK-14235][kafka,tests] Change source in at-least-once test from finite to infinite
     new 34bab6f  [FLINK-14370][kafka][test-stability] Fix the cascading test failure in KafkaProducerTestBase.
     new 9db9543  [hotfix][kafka][test-stability] Accelerate the KafkaProducerTest by reducing the timeout values.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streaming/connectors/kafka/KafkaProducerTestBase.java     | 11 +++++++----
 .../flink/streaming/connectors/kafka/KafkaTestBase.java       |  4 ++++
 2 files changed, 11 insertions(+), 4 deletions(-)


[flink] 01/02: [FLINK-14370][kafka][test-stability] Fix the cascading test failure in KafkaProducerTestBase.

Posted by jq...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 34bab6f577a5f91edadc93aeae76eefb913ba846
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Sat Oct 26 18:56:59 2019 +0800

    [FLINK-14370][kafka][test-stability] Fix the cascading test failure in KafkaProducerTestBase.
---
 .../flink/streaming/connectors/kafka/KafkaProducerTestBase.java    | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 311e515..d89e8ad 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -280,13 +280,12 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 		try {
 			env.execute("One-to-one at least once test");
 			fail("Job should fail!");
-		}
-		catch (JobExecutionException ex) {
+		} catch (JobExecutionException ex) {
 			// ignore error, it can be one of many errors so it would be hard to check the exception message/cause
+		} finally {
+			kafkaServer.unblockProxyTraffic();
 		}
 
-		kafkaServer.unblockProxyTraffic();
-
 		// assert that before failure we successfully snapshot/flushed all expected elements
 		assertAtLeastOnceForTopic(
 				properties,


[flink] 02/02: [hotfix][kafka][test-stability] Accelerate the KafkaProducerTest by reducing the timeout values.

Posted by jq...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9db954365ac0ac317fdda4c11f9204c5df3aaebc
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Sat Oct 26 18:55:44 2019 +0800

    [hotfix][kafka][test-stability] Accelerate the KafkaProducerTest by reducing the timeout values.
---
 .../flink/streaming/connectors/kafka/KafkaProducerTestBase.java       | 4 ++++
 .../org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java    | 4 ++++
 2 files changed, 8 insertions(+)

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index d89e8ad..1d7630a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -245,6 +245,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 		properties.putAll(secureProps);
 		// decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
 		properties.setProperty("timeout.ms", "10000");
+		// KafkaProducer prior to KIP-91 (release 2.1) uses request timeout to expire the unsent records.
+		properties.setProperty("request.timeout.ms", "3000");
+		// KafkaProducer in 2.1.0 and above uses delivery timeout to expire the the records.
+		properties.setProperty("delivery.timeout.ms", "5000");
 		properties.setProperty("max.block.ms", "10000");
 		// increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
 		properties.setProperty("batch.size", "10240000");
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index c86dd08..b85481f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -222,6 +222,10 @@ public abstract class KafkaTestBase extends TestLogger {
 		while (System.currentTimeMillis() < startMillis + timeoutMillis) {
 			properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
 			properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+			// We need to set these two properties so that they are lower than request.timeout.ms. This is
+			// required for some old KafkaConsumer versions.
+			properties.put("session.timeout.ms", "2000");
+			properties.put("heartbeat.interval.ms", "500");
 
 			// query kafka for new records ...
 			Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100);