You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/02 17:01:15 UTC

[4/4] flink git commit: [hotfix] Make failover region topological sorted

[hotfix] Make failover region topological sorted


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ff91be1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ff91be1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ff91be1

Branch: refs/heads/master
Commit: 3ff91be1d49cf9f972ad5f1c556af173d97d102e
Parents: 3b0fb26
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Oct 26 18:22:43 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 2 17:04:45 2017 +0100

----------------------------------------------------------------------
 .../executiongraph/failover/RestartPipelinedRegionStrategy.java | 5 +++--
 .../apache/flink/runtime/executiongraph/FailoverRegionTest.java | 1 +
 2 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3ff91be1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
index 1884d1c..b8f6964 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
@@ -166,8 +166,9 @@ public class RestartPipelinedRegionStrategy extends FailoverStrategy {
 									if (predecessorRegion != thisRegion) {
 
 										// we need to merge our region and the predecessor's region
-										thisRegion.addAll(predecessorRegion);
-										distinctRegions.remove(predecessorRegion);
+										predecessorRegion.addAll(thisRegion);
+										distinctRegions.remove(thisRegion);
+										thisRegion = predecessorRegion;
 
 										// remap the vertices from that merged region
 										for (ExecutionVertex inPredRegion: predecessorRegion) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3ff91be1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index f1e0f7c..4d53e67 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -422,6 +422,7 @@ public class FailoverRegionTest extends TestLogger {
 		v2.setInvokableClass(AbstractInvokable.class);
 		v3.setInvokableClass(AbstractInvokable.class);
 
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		v3.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 		v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);