You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2019/10/29 05:45:32 UTC

[flink] branch release-1.8 updated (e0387a8 -> 443ffae)

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a change to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from e0387a8  [FLINK-14235][kafka,tests] Change source in at-least-once test from finite to infinite
     new cf7509b  [FLINK-14370][kafka][test-stability] Fix the cascading test failure in KafkaProducerTestBase.
     new 443ffae  [hotfix][kafka][test-stability] Accelerate the KafkaProducerTest by reducing the timeout values.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streaming/connectors/kafka/KafkaProducerTestBase.java     | 11 +++++++----
 .../flink/streaming/connectors/kafka/KafkaTestBase.java       |  4 ++++
 2 files changed, 11 insertions(+), 4 deletions(-)


[flink] 02/02: [hotfix][kafka][test-stability] Accelerate the KafkaProducerTest by reducing the timeout values.

Posted by jq...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 443ffaeb6c828be2f2ad1c434e9873341c44bcef
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Sat Oct 26 18:55:44 2019 +0800

    [hotfix][kafka][test-stability] Accelerate the KafkaProducerTest by reducing the timeout values.
---
 .../flink/streaming/connectors/kafka/KafkaProducerTestBase.java       | 4 ++++
 .../org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java    | 4 ++++
 2 files changed, 8 insertions(+)

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index d89e8ad..1d7630a 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -245,6 +245,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 		properties.putAll(secureProps);
 		// decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close()
 		properties.setProperty("timeout.ms", "10000");
+		// KafkaProducer prior to KIP-91 (release 2.1) uses request timeout to expire the unsent records.
+		properties.setProperty("request.timeout.ms", "3000");
+		// KafkaProducer in 2.1.0 and above uses delivery timeout to expire the the records.
+		properties.setProperty("delivery.timeout.ms", "5000");
 		properties.setProperty("max.block.ms", "10000");
 		// increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately
 		properties.setProperty("batch.size", "10240000");
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index d23523f..9527694 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -210,6 +210,10 @@ public abstract class KafkaTestBase extends TestLogger {
 		while (System.currentTimeMillis() < startMillis + timeoutMillis) {
 			properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
 			properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+			// We need to set these two properties so that they are lower than request.timeout.ms. This is
+			// required for some old KafkaConsumer versions.
+			properties.put("session.timeout.ms", "2000");
+			properties.put("heartbeat.interval.ms", "500");
 
 			// query kafka for new records ...
 			Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100);


[flink] 01/02: [FLINK-14370][kafka][test-stability] Fix the cascading test failure in KafkaProducerTestBase.

Posted by jq...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cf7509b91888e4c6b64eb514fbb62af49533e0f0
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Sat Oct 26 18:56:59 2019 +0800

    [FLINK-14370][kafka][test-stability] Fix the cascading test failure in KafkaProducerTestBase.
---
 .../flink/streaming/connectors/kafka/KafkaProducerTestBase.java    | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 311e515..d89e8ad 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -280,13 +280,12 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 		try {
 			env.execute("One-to-one at least once test");
 			fail("Job should fail!");
-		}
-		catch (JobExecutionException ex) {
+		} catch (JobExecutionException ex) {
 			// ignore error, it can be one of many errors so it would be hard to check the exception message/cause
+		} finally {
+			kafkaServer.unblockProxyTraffic();
 		}
 
-		kafkaServer.unblockProxyTraffic();
-
 		// assert that before failure we successfully snapshot/flushed all expected elements
 		assertAtLeastOnceForTopic(
 				properties,