You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/06/09 13:18:28 UTC

flink git commit: [FLINK-3530] Fix Kafka08 instability: Avoid restarts from SuccessException

Repository: flink
Updated Branches:
  refs/heads/master 6cc1c179a -> ccc86e9f1


[FLINK-3530] Fix Kafka08 instability: Avoid restarts from SuccessException

This closes #2080


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ccc86e9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ccc86e9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ccc86e9f

Branch: refs/heads/master
Commit: ccc86e9f1cddebc731ac1ccabdd469df11d72d8b
Parents: 6cc1c17
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Jun 7 18:20:17 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 9 15:18:09 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/kafka/KafkaConsumerTestBase.java     | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ccc86e9f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 3ba8cff..660f24c 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -357,7 +357,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env.enableCheckpointing(500);
 		env.setParallelism(parallelism);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 		env.getConfig().disableSysoutLogging();
 
 		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
@@ -402,7 +402,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env.enableCheckpointing(500);
 		env.setParallelism(parallelism);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 		env.getConfig().disableSysoutLogging();
 
 		FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps);
@@ -447,7 +447,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env.enableCheckpointing(500);
 		env.setParallelism(parallelism);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+		// set the number of restarts to one. The failing mapper will fail once, then it's only success exceptions.
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
 		env.getConfig().disableSysoutLogging();
 		env.setBufferTimeout(0);