You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/07/20 13:52:19 UTC

[1/3] flink git commit: [FLINK-6665] [FLINK-6667] [distributed coordination] Use a callback and a ScheduledExecutor for ExecutionGraph restarts

Repository: flink
Updated Branches:
  refs/heads/release-1.3 129a82fba -> 39f5b1144


[FLINK-6665] [FLINK-6667] [distributed coordination] Use a callback and a ScheduledExecutor for ExecutionGraph restarts

Initial work by zjureel@gmail.com , improved by sewen@apache.org.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c0803d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c0803d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c0803d5

Branch: refs/heads/release-1.3
Commit: 6c0803d5e607d8e0e647a2559570fedf7968a9dc
Parents: 129a82f
Author: zjureel <zj...@gmail.com>
Authored: Tue Jul 18 19:27:56 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 20 15:10:17 2017 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |   7 +-
 .../restart/ExecutionGraphRestartCallback.java  |  51 ++++++++
 .../restart/ExecutionGraphRestarter.java        |  45 -------
 .../restart/FailureRateRestartStrategy.java     |  15 ++-
 .../restart/FixedDelayRestartStrategy.java      |  15 ++-
 .../restart/NoRestartStrategy.java              |   5 +-
 .../executiongraph/restart/RestartCallback.java |  32 +++++
 .../executiongraph/restart/RestartStrategy.java |  12 +-
 .../ExecutionGraphMetricsTest.java              |  17 +--
 .../ExecutionGraphRestartTest.java              |  51 ++------
 .../executiongraph/ExecutionGraphTestUtils.java |  11 +-
 .../restart/FailureRateRestartStrategyTest.java | 128 +++++++++++++++++++
 .../restart/FixedDelayRestartStrategyTest.java  |  77 ++++++++---
 .../restart/InfiniteDelayRestartStrategy.java   |  10 +-
 .../restart/LatchedRestarter.java               |  38 ++++++
 .../executiongraph/restart/NoOpRestarter.java   |  28 ++++
 16 files changed, 407 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 7c13936..9f29faf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -44,11 +44,14 @@ import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -1388,7 +1391,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 				if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
 					LOG.info("Restarting the job {} ({}).", getJobName(), getJobID());
-					restartStrategy.restart(this);
+
+					RestartCallback restarter = new ExecutionGraphRestartCallback(this);
+					restartStrategy.restart(restarter, new ScheduledExecutorServiceAdapter(futureExecutor));
 
 					return true;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
new file mode 100644
index 0000000..5874f91
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}. 
+ * 
+ * <p>This callback implementation is one-shot; it can only be used once.
+ */
+public class ExecutionGraphRestartCallback implements RestartCallback {
+
+	/** The ExecutionGraph to restart */
+	private final ExecutionGraph execGraph;
+
+	/** Atomic flag to make sure this is used only once */
+	private final AtomicBoolean used;
+
+	public ExecutionGraphRestartCallback(ExecutionGraph execGraph) {
+		this.execGraph = checkNotNull(execGraph);
+		this.used = new AtomicBoolean(false);
+	}
+
+	@Override
+	public void triggerFullRecovery() {
+		if (used.compareAndSet(false, true)) {
+			execGraph.restart();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java
deleted file mode 100644
index 9287694..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph.restart;
-
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-
-class ExecutionGraphRestarter {
-	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class);
-	public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis) {
-		return new Callable<Object>() {
-			@Override
-			public Object call() throws Exception {
-				try {
-					LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis);
-					// do the delay
-					Thread.sleep(delayBetweenRestartAttemptsInMillis);
-				} catch(InterruptedException e) {
-					// should only happen on shutdown
-				}
-				executionGraph.restart();
-				return null;
-			}
-		};
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index d95e1c3..6580ec3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.executiongraph.restart;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.Preconditions;
 
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
  * with a fixed time delay in between.
  */
 public class FailureRateRestartStrategy implements RestartStrategy {
+
 	private final Time failuresInterval;
 	private final Time delayInterval;
 	private final int maxFailuresPerInterval;
@@ -66,16 +67,22 @@ public class FailureRateRestartStrategy implements RestartStrategy {
 	}
 
 	@Override
-	public void restart(final ExecutionGraph executionGraph) {
+	public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
 		if (isRestartTimestampsQueueFull()) {
 			restartTimestampsDeque.remove();
 		}
 		restartTimestampsDeque.add(System.currentTimeMillis());
-		FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getFutureExecutor());
+
+		executor.schedule(new Runnable() {
+			@Override
+			public void run() {
+				restarter.triggerFullRecovery();
+			}
+		}, delayInterval.getSize(), delayInterval.getUnit());
 	}
 
 	private boolean isRestartTimestampsQueueFull() {
-		return restartTimestampsDeque.size() == maxFailuresPerInterval;
+		return restartTimestampsDeque.size() >= maxFailuresPerInterval;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index f51ea7c..0dd700a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -20,16 +20,19 @@ package org.apache.flink.runtime.executiongraph.restart;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.Duration;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Restart strategy which tries to restart the given {@link ExecutionGraph} a fixed number of times
  * with a fixed time delay in between.
  */
 public class FixedDelayRestartStrategy implements RestartStrategy {
+
 	private final int maxNumberRestartAttempts;
 	private final long delayBetweenRestartAttempts;
 	private int currentRestartAttempt;
@@ -56,9 +59,15 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 	}
 
 	@Override
-	public void restart(final ExecutionGraph executionGraph) {
+	public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
 		currentRestartAttempt++;
-		FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getFutureExecutor());
+
+		executor.schedule(new Runnable() {
+			@Override
+			public void run() {
+				restarter.triggerFullRecovery();
+			}
+		}, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
index 958d9ac..5502d2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph.restart;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 
 /**
@@ -32,8 +33,8 @@ public class NoRestartStrategy implements RestartStrategy {
 	}
 
 	@Override
-	public void restart(ExecutionGraph executionGraph) {
-		throw new RuntimeException("NoRestartStrategy does not support restart.");
+	public void restart(RestartCallback restarter, ScheduledExecutor executor) {
+		throw new UnsupportedOperationException("NoRestartStrategy does not support restart.");
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java
new file mode 100644
index 0000000..6499be3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+/**
+ * A callback to trigger restarts, passed to the {@link RestartStrategy} to
+ * trigger recovery on the ExecutionGraph. 
+ */
+public interface RestartCallback {
+
+	/**
+	 * Triggers a full recovery in the target ExecutionGraph.
+	 * A full recovery resets all vertices to the state of the latest checkpoint.
+	 */
+	void triggerFullRecovery();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
index 2880c01..60e2e8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph.restart;
 
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 
 /**
@@ -33,9 +34,14 @@ public interface RestartStrategy {
 	boolean canRestart();
 
 	/**
-	 * Restarts the given {@link ExecutionGraph}.
+	 * Called by the ExecutionGraph to eventually trigger a full recovery.
+	 * The recovery must be triggered on the given callback object, and may be delayed
+	 * with the help of the given scheduled executor.
+	 * 
+	 * <p>The thread that calls this method is not supposed to block/sleep.
 	 *
-	 * @param executionGraph The ExecutionGraph to be restarted
+	 * @param restarter The hook to restart the ExecutionGraph
+	 * @param executor An scheduled executor to delay the restart
 	 */
-	void restart(ExecutionGraph executionGraph);
+	void restart(RestartCallback restarter, ScheduledExecutor executor);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 6b5ceae..0785a26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -24,11 +24,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -255,25 +257,20 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 	static class TestingRestartStrategy implements RestartStrategy {
 
-		private boolean restartable = true;
-		private ExecutionGraph executionGraph = null;
+		private RestartCallback restarter;
 
 		@Override
 		public boolean canRestart() {
-			return restartable;
+			return true;
 		}
 
 		@Override
-		public void restart(ExecutionGraph executionGraph) {
-			this.executionGraph = executionGraph;
-		}
-
-		public void setRestartable(boolean restartable) {
-			this.restartable = restartable;
+		public void restart(RestartCallback restarter, ScheduledExecutor executor) {
+			this.restarter = restarter;
 		}
 
 		public void restartExecutionGraph() {
-			executionGraph.restart();
+			restarter.triggerFullRecovery();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index eeb6c69..81702a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -18,21 +18,24 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.dispatch.Futures;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.FailureRateRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -41,6 +44,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
@@ -56,11 +60,9 @@ import scala.concurrent.impl.Promise;
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -159,34 +161,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	}
 
 	@Test
-	public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception {
-		FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(2, Time.of(10, TimeUnit.SECONDS), Time.of(0, TimeUnit.SECONDS));
-		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS);
-		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
-		ExecutionGraph eg = executionGraphInstanceTuple.f0;
-
-		restartAfterFailure(eg, timeout, false);
-		restartAfterFailure(eg, timeout, false);
-		makeAFailureAndWait(eg, timeout);
-		//failure rate limit exceeded, so task should be failed
-		assertEquals(JobStatus.FAILED, eg.getState());
-	}
-
-	@Test
-	public void taskShouldNotFailWhenFailureRateLimitWasNotExceeded() throws Exception {
-		FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(1, Time.of(1, TimeUnit.MILLISECONDS), Time.of(0, TimeUnit.SECONDS));
-		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS);
-		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
-		ExecutionGraph eg = executionGraphInstanceTuple.f0;
-
-		//task restarted many times, but after all job is still running, because rate limit was not exceeded
-		restartAfterFailure(eg, timeout, false);
-		restartAfterFailure(eg, timeout, false);
-		restartAfterFailure(eg, timeout, false);
-		assertEquals(JobStatus.RUNNING, eg.getState());
-	}
-
-	@Test
 	public void testCancelWhileRestarting() throws Exception {
 		// We want to manually control the restart and delay
 		RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
@@ -618,23 +592,20 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		}
 
 		@Override
-		public void restart(final ExecutionGraph executionGraph) {
-			Futures.future(new Callable<Object>() {
+		public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
+			executor.execute(new Runnable() {
 				@Override
-				public Object call() throws Exception {
+				public void run() {
 					try {
-
 						Await.ready(doRestart.future(), timeout);
-						executionGraph.restart();
+						restarter.triggerFullRecovery();
 					} catch (Exception e) {
 						exception = e;
 					}
 
 					restartDone.success(true);
-
-					return null;
 				}
-			}, TestingUtils.defaultExecutionContext());
+			});
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 140e984..51b5c7f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -254,9 +254,7 @@ public class ExecutionGraphTestUtils {
 	 * restart strategy.
 	 */
 	public static ExecutionGraph createSimpleTestGraph(RestartStrategy restartStrategy) throws Exception {
-		JobVertex vertex = new JobVertex("vertex");
-		vertex.setInvokableClass(NoOpInvokable.class);
-		vertex.setParallelism(10);
+		JobVertex vertex = createNoOpVertex(10);
 
 		return createSimpleTestGraph(new JobID(), restartStrategy, vertex);
 	}
@@ -313,6 +311,13 @@ public class ExecutionGraphTestUtils {
 				1,
 				TEST_LOGGER);
 	}
+	
+	public static JobVertex createNoOpVertex(int parallelism) {
+		JobVertex vertex = new JobVertex("vertex");
+		vertex.setInvokableClass(NoOpInvokable.class);
+		vertex.setParallelism(parallelism);
+		return vertex;
+	}
 
 	// ------------------------------------------------------------------------
 	//  utility mocking methods

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java
new file mode 100644
index 0000000..b42dd77
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for the {@link FailureRateRestartStrategy}.
+ */
+public class FailureRateRestartStrategyTest {
+
+	public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
+
+	public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
+
+	@After
+	public void shutdownExecutor() {
+		executorService.shutdownNow();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testManyFailuresWithinRate() throws Exception {
+		final int numAttempts = 10;
+		final int intervalMillis = 1;
+
+		final FailureRateRestartStrategy restartStrategy =
+				new FailureRateRestartStrategy(1, Time.milliseconds(intervalMillis), Time.milliseconds(0));
+
+		for (int attempsLeft = numAttempts; attempsLeft > 0; --attempsLeft) {
+			assertTrue(restartStrategy.canRestart());
+			restartStrategy.restart(new NoOpRestarter(), executor);
+			sleepGuaranteed(2 * intervalMillis);
+		}
+
+		assertTrue(restartStrategy.canRestart());
+	}
+
+	@Test
+	public void testFailuresExceedingRate() throws Exception {
+		final int numFailures = 3;
+		final int intervalMillis = 10_000;
+
+		final FailureRateRestartStrategy restartStrategy =
+				new FailureRateRestartStrategy(numFailures, Time.milliseconds(intervalMillis), Time.milliseconds(0));
+
+		for (int failuresLeft = numFailures; failuresLeft > 0; --failuresLeft) {
+			assertTrue(restartStrategy.canRestart());
+			restartStrategy.restart(new NoOpRestarter(), executor);
+		}
+
+		// now the rate should be exceeded
+		assertFalse(restartStrategy.canRestart());
+	}
+
+	@Test
+	public void testDelay() throws Exception {
+		final long restartDelay = 2;
+		final int numberRestarts = 10;
+
+		final FailureRateRestartStrategy strategy =
+			new FailureRateRestartStrategy(numberRestarts + 1, Time.milliseconds(1), Time.milliseconds(restartDelay));
+
+		for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) {
+			assertTrue(strategy.canRestart());
+
+			final OneShotLatch sync = new OneShotLatch();
+			final RestartCallback restarter = new LatchedRestarter(sync);
+
+			final long time = System.nanoTime();
+			strategy.restart(restarter, executor);
+			sync.await();
+
+			final long elapsed = System.nanoTime() - time;
+			assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method makes sure that the actual interval and is not spuriously waking up.
+	 */
+	private static void sleepGuaranteed(long millis) throws InterruptedException {
+		final long deadline = System.nanoTime() + millis * 1_000_000;
+
+		long nanosToSleep;
+		while ((nanosToSleep = deadline - System.nanoTime()) > 0) {
+			long millisToSleep = nanosToSleep / 1_000_000;
+			if (nanosToSleep % 1_000_000 != 0) {
+				millisToSleep++;
+			}
+
+			Thread.sleep(millisToSleep);
+		}
+	}
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
index 4beedb0..17504d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
@@ -18,34 +18,75 @@
 
 package org.apache.flink.runtime.executiongraph.restart;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.junit.After;
 import org.junit.Test;
-import org.mockito.Mockito;
-import scala.concurrent.ExecutionContext$;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for the {@link FixedDelayRestartStrategy}.
+ */
 public class FixedDelayRestartStrategyTest {
 
+	public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
+
+	public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
+
+	@After
+	public void shutdownExecutor() {
+		executorService.shutdownNow();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testNumberOfRestarts() throws Exception {
+		final int numberRestarts = 10;
+
+		final FixedDelayRestartStrategy strategy =
+				new FixedDelayRestartStrategy(numberRestarts, 0L);
+
+		for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) {
+			// two calls to 'canRestart()' to make sure this is not used to maintain the counter
+			assertTrue(strategy.canRestart());
+			assertTrue(strategy.canRestart());
+
+			strategy.restart(new NoOpRestarter(), executor);
+		}
+
+		assertFalse(strategy.canRestart());
+	}
+
 	@Test
-	public void testFixedDelayRestartStrategy() {
-		int numberRestarts = 10;
-		long restartDelay = 10;
+	public void testDelay() throws Exception {
+		final long restartDelay = 10;
+		final int numberRestarts = 10;
+
+		final FixedDelayRestartStrategy strategy =
+				new FixedDelayRestartStrategy(numberRestarts, restartDelay);
+
+		for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) {
+			assertTrue(strategy.canRestart());
 
-		FixedDelayRestartStrategy fixedDelayRestartStrategy = new FixedDelayRestartStrategy(
-			numberRestarts,
-			restartDelay);
+			final OneShotLatch sync = new OneShotLatch();
+			final RestartCallback restarter = new LatchedRestarter(sync);
 
-		ExecutionGraph executionGraph = mock(ExecutionGraph.class);
-		when(executionGraph.getFutureExecutor())
-			.thenReturn(ExecutionContext$.MODULE$.fromExecutor(MoreExecutors.directExecutor()));
+			final long time = System.nanoTime();
+			strategy.restart(restarter, executor);
+			sync.await();
 
-		while(fixedDelayRestartStrategy.canRestart()) {
-			fixedDelayRestartStrategy.restart(executionGraph);
+			final long elapsed = System.nanoTime() - time;
+			assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000);
 		}
 
-		Mockito.verify(executionGraph, Mockito.times(numberRestarts)).restart();
+		assertFalse(strategy.canRestart());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
index c1cbdd3..d145298 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph.restart;
 
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
  * Actually {@link ExecutionGraph} will never be restarted. No additional threads will be used.
  */
 public class InfiniteDelayRestartStrategy implements RestartStrategy {
+
 	private static final Logger LOG = LoggerFactory.getLogger(InfiniteDelayRestartStrategy.class);
 
 	private final int maxRestartAttempts;
@@ -43,15 +45,11 @@ public class InfiniteDelayRestartStrategy implements RestartStrategy {
 
 	@Override
 	public boolean canRestart() {
-		if (maxRestartAttempts >= 0) {
-			return restartAttemptCounter < maxRestartAttempts;
-		} else {
-			return true;
-		}
+		return maxRestartAttempts < 0 || restartAttemptCounter < maxRestartAttempts;
 	}
 
 	@Override
-	public void restart(ExecutionGraph executionGraph) {
+	public void restart(RestartCallback restarter, ScheduledExecutor executor) {
 		LOG.info("Delaying retry of job execution forever");
 
 		if (maxRestartAttempts >= 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java
new file mode 100644
index 0000000..7682fab
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+
+/**
+ * A testing RestartCallback that triggers a latch when restart is triggered.
+ */
+class LatchedRestarter implements RestartCallback {
+
+	private final OneShotLatch latch;
+
+	LatchedRestarter(OneShotLatch latch) {
+		this.latch = latch;
+	}
+
+	@Override
+	public void triggerFullRecovery() {
+		latch.trigger();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java
new file mode 100644
index 0000000..04a7c10
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+/**
+ * A testing RestartCallback that does nothing. 
+ */
+class NoOpRestarter implements RestartCallback {
+
+	@Override
+	public void triggerFullRecovery() {}
+}


[3/3] flink git commit: [FLINK-7231] [distr. coordination] Fix slot release affecting SlotSharingGroup cleanup

Posted by se...@apache.org.
[FLINK-7231] [distr. coordination] Fix slot release affecting SlotSharingGroup cleanup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39f5b114
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39f5b114
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39f5b114

Branch: refs/heads/release-1.3
Commit: 39f5b1144167dcb80e8708f4cb5426e76f648026
Parents: e6348fb
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 19 10:24:52 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 20 15:43:14 2017 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |   9 +-
 .../ExecutionGraphRestartTest.java              | 149 ++++++++++++++++---
 .../executiongraph/ExecutionGraphTestUtils.java |  19 ++-
 .../utils/NotCancelAckingTaskGateway.java       |  32 ++++
 4 files changed, 183 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a7d768b..f9d2d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -929,13 +929,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					}
 					catch (Throwable t) {
 						// we catch everything here to make sure cleanup happens and the
-						// ExecutionGraph notices
-						// we need to go into recovery and make sure to release all slots
+						// ExecutionGraph notices the error
+
+						// we need to to release all slots before going into recovery! 
 						try {
-							failGlobal(t);
+							ExecutionGraphUtils.releaseAllSlotsSilently(resources);
 						}
 						finally {
-							ExecutionGraphUtils.releaseAllSlotsSilently(resources);
+							failGlobal(t);
 						}
 					}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 3ce6baa..bd8b4ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,13 +21,12 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
@@ -35,24 +34,33 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
 import org.junit.Test;
 
 import scala.concurrent.Await;
@@ -62,8 +70,12 @@ import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
@@ -85,6 +97,15 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	private final static int NUM_TASKS = 31;
 
+	private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
+
+	@After
+	public void shutdown() {
+		executor.shutdownNow();
+	}
+
+	// ------------------------------------------------------------------------
+
 	@Test
 	public void testNoManualRestart() throws Exception {
 		NoRestartStrategy restartStrategy = new NoRestartStrategy();
@@ -661,10 +682,117 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testRestartWithEagerSchedulingAndSlotSharing() throws Exception {
+		// this test is inconclusive if not used with a proper multi-threaded executor
+		assertTrue("test assumptions violated", ((ThreadPoolExecutor) executor).getCorePoolSize() > 1);
+
+		final int parallelism = 20;
+		final Scheduler scheduler = createSchedulerWithInstances(parallelism);
+
+		final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+
+		final JobVertex source = new JobVertex("source");
+		source.setInvokableClass(NoOpInvokable.class);
+		source.setParallelism(parallelism);
+		source.setSlotSharingGroup(sharingGroup);
+
+		final JobVertex sink = new JobVertex("sink");
+		sink.setInvokableClass(NoOpInvokable.class);
+		sink.setParallelism(parallelism);
+		sink.setSlotSharingGroup(sharingGroup);
+		sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+
+		final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
+				new JobID(), scheduler, new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0), executor, source, sink);
+
+		eg.setScheduleMode(ScheduleMode.EAGER);
+		eg.scheduleForExecution();
+
+		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+		// fail into 'RESTARTING'
+		eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
+				new Exception("intended test failure"));
+
+		assertEquals(JobStatus.FAILING, eg.getState());
+		completeCancellingForAllVertices(eg);
+
+		// clean termination
+		waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
+		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+		finishAllVertices(eg);
+		waitUntilJobStatus(eg, JobStatus.FINISHED, 1000);
+	}
+
+	@Test
+	public void testRestartWithSlotSharingAndNotEnoughResources() throws Exception {
+		// this test is inconclusive if not used with a proper multi-threaded executor
+		assertTrue("test assumptions violated", ((ThreadPoolExecutor) executor).getCorePoolSize() > 1);
+
+		final int numRestarts = 10;
+		final int parallelism = 20;
+
+		final Scheduler scheduler = createSchedulerWithInstances(parallelism - 1);
+
+		final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+
+		final JobVertex source = new JobVertex("source");
+		source.setInvokableClass(NoOpInvokable.class);
+		source.setParallelism(parallelism);
+		source.setSlotSharingGroup(sharingGroup);
+
+		final JobVertex sink = new JobVertex("sink");
+		sink.setInvokableClass(NoOpInvokable.class);
+		sink.setParallelism(parallelism);
+		sink.setSlotSharingGroup(sharingGroup);
+		sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+
+		final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
+				new JobID(), scheduler, new FixedDelayRestartStrategy(numRestarts, 0), executor, source, sink);
+
+		eg.setScheduleMode(ScheduleMode.EAGER);
+		eg.scheduleForExecution();
+
+		// wait until no more changes happen
+		while (eg.getNumberOfFullRestarts() < numRestarts) {
+			Thread.sleep(1);
+		}
+
+		waitUntilJobStatus(eg, JobStatus.FAILED, 1000);
+
+		final Throwable t = eg.getFailureCause();
+		if (!(t instanceof NoResourceAvailableException)) {
+			ExceptionUtils.rethrowException(t, t.getMessage());
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 
+	private Scheduler createSchedulerWithInstances(int num) {
+		final Scheduler scheduler = new Scheduler(executor);
+		final Instance[] instances = new Instance[num];
+
+		for (int i = 0; i < instances.length; i++) {
+			instances[i] = createInstance(55443 + i);
+			scheduler.newInstanceAvailable(instances[i]);
+		}
+
+		return scheduler;
+	}
+
+	private static Instance createInstance(int port) {
+		final HardwareDescription resources = new HardwareDescription(4, 1_000_000_000, 500_000_000, 400_000_000);
+		final TaskManagerGateway taskManager = new SimpleAckingTaskManagerGateway();
+		final TaskManagerLocation location = new TaskManagerLocation(
+				ResourceID.generate(), InetAddress.getLoopbackAddress(), port);
+		return new Instance(taskManager, location, new InstanceID(), resources, 1);
+	}
+
+	// ------------------------------------------------------------------------
+
 	private static class ControllableRestartStrategy implements RestartStrategy {
 
 		private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise<>();
@@ -773,10 +901,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			scheduler);
 	}
 
-	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
-		return newExecutionGraph(restartStrategy, new Scheduler(TestingUtils.defaultExecutionContext()));
-	}
-
 	private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {
 		makeAFailureAndWait(eg, timeout);
 
@@ -841,17 +965,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * A TaskManager gateway that does not ack cancellations.
-	 */
-	private static final class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
-
-		@Override
-		public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
-			return new FlinkCompletableFuture<>();
-		}
-	}
-
-	/**
 	 * A RestartStrategy that blocks restarting on a given {@link OneShotLatch}.
 	 */
 	private static final class TriggeredRestartStrategy implements RestartStrategy {

http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 83c8fb0..dbbd3e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -321,6 +322,16 @@ public class ExecutionGraphTestUtils {
 			RestartStrategy restartStrategy,
 			JobVertex... vertices) throws Exception {
 
+		return createExecutionGraph(jid, slotProvider, restartStrategy, TestingUtils.defaultExecutor(), vertices);
+	}
+
+	public static ExecutionGraph createExecutionGraph(
+			JobID jid,
+			SlotProvider slotProvider,
+			RestartStrategy restartStrategy,
+			ScheduledExecutorService executor,
+			JobVertex... vertices) throws Exception {
+
 		checkNotNull(jid);
 		checkNotNull(restartStrategy);
 		checkNotNull(vertices);
@@ -329,18 +340,18 @@ public class ExecutionGraphTestUtils {
 				null,
 				new JobGraph(jid, "test job", vertices),
 				new Configuration(),
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
+				executor,
+				executor,
 				slotProvider,
 				ExecutionGraphTestUtils.class.getClassLoader(),
-				mock(CheckpointRecoveryFactory.class),
+				new StandaloneCheckpointRecoveryFactory(),
 				Time.seconds(10),
 				restartStrategy,
 				new UnregisteredMetricsGroup(),
 				1,
 				TEST_LOGGER);
 	}
-	
+
 	public static JobVertex createNoOpVertex(int parallelism) {
 		JobVertex vertex = new JobVertex("vertex");
 		vertex.setInvokableClass(NoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
new file mode 100644
index 0000000..f453d20
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.utils;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+public class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
+
+	@Override
+	public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return new FlinkCompletableFuture<>();
+	}
+}
\ No newline at end of file


[2/3] flink git commit: [FLINK-7216] [distr. coordination] Guard against concurrent global failover

Posted by se...@apache.org.
[FLINK-7216] [distr. coordination] Guard against concurrent global failover


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e6348fbd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e6348fbd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e6348fbd

Branch: refs/heads/release-1.3
Commit: e6348fbde1fc0ee8ea682063a4d6503ba3b68864
Parents: 6c0803d
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 18 19:49:56 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 20 15:26:43 2017 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  35 ++--
 .../restart/ExecutionGraphRestartCallback.java  |  23 ++-
 .../executiongraph/restart/RestartStrategy.java |   2 +-
 .../ExecutionGraphRestartTest.java              | 161 ++++++++++++++++++-
 .../executiongraph/ExecutionGraphTestUtils.java |  35 +++-
 5 files changed, 231 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9f29faf..a7d768b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -966,7 +966,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				if (transitionState(current, JobStatus.CANCELLING)) {
 
 					// make sure no concurrent local actions interfere with the cancellation
-					incrementGlobalModVersion();
+					final long globalVersionForRestart = incrementGlobalModVersion();
 
 					final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
 
@@ -980,7 +980,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					allTerminal.thenAccept(new AcceptFunction<Void>() {
 						@Override
 						public void accept(Void value) {
-							allVerticesInTerminalState();
+							// cancellations may currently be overridden by failures which trigger
+							// restarts, so we need to pass a proper restart global version here
+							allVerticesInTerminalState(globalVersionForRestart);
 						}
 					});
 
@@ -1085,17 +1087,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				return;
 			}
 			else if (current == JobStatus.RESTARTING) {
+				// we handle 'failGlobal()' while in 'RESTARTING' as a safety net in case something
+				// has gone wrong in 'RESTARTING' and we need to re-attempt the restarts
 				this.failureCause = t;
 
-				if (tryRestartOrFail()) {
+				final long globalVersionForRestart = incrementGlobalModVersion();
+				if (tryRestartOrFail(globalVersionForRestart)) {
 					return;
 				}
 			}
 			else if (transitionState(current, JobStatus.FAILING, t)) {
 				this.failureCause = t;
 
-				// make sure no concurrent local actions interfere with the cancellation
-				incrementGlobalModVersion();
+				// make sure no concurrent local or global actions interfere with the failover
+				final long globalVersionForRestart = incrementGlobalModVersion();
 
 				// we build a future that is complete once all vertices have reached a terminal state
 				final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
@@ -1109,7 +1114,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				allTerminal.thenAccept(new AcceptFunction<Void>() {
 					@Override
 					public void accept(Void value) {
-						allVerticesInTerminalState();
+						allVerticesInTerminalState(globalVersionForRestart);
 					}
 				});
 
@@ -1120,10 +1125,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	public void restart() {
+	public void restart(long expectedGlobalVersion) {
 		try {
 			synchronized (progressLock) {
-				JobStatus current = state;
+				// check the global version to see whether this recovery attempt is still valid
+				if (globalModVersion != expectedGlobalVersion) {
+					LOG.info("Concurrent full restart subsumed this restart.");
+					return;
+				}
+
+				final JobStatus current = state;
 
 				if (current == JobStatus.CANCELED) {
 					LOG.info("Canceled job during restart. Aborting restart.");
@@ -1329,7 +1340,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * This method is a callback during cancellation/failover and called when all tasks
 	 * have reached a terminal state (cancelled/failed/finished).
 	 */
-	private void allVerticesInTerminalState() {
+	private void allVerticesInTerminalState(long expectedGlobalVersionForRestart) {
 		// we are done, transition to the final state
 		JobStatus current;
 		while (true) {
@@ -1345,7 +1356,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				}
 			}
 			else if (current == JobStatus.FAILING) {
-				if (tryRestartOrFail()) {
+				if (tryRestartOrFail(expectedGlobalVersionForRestart)) {
 					break;
 				}
 				// concurrent job status change, let's check again
@@ -1374,7 +1385,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 *
 	 * @return true if the operation could be executed; false if a concurrent job status change occurred
 	 */
-	private boolean tryRestartOrFail() {
+	private boolean tryRestartOrFail(long globalModVersionForRestart) {
 		JobStatus currentState = state;
 
 		if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
@@ -1392,7 +1403,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
 					LOG.info("Restarting the job {} ({}).", getJobName(), getJobID());
 
-					RestartCallback restarter = new ExecutionGraphRestartCallback(this);
+					RestartCallback restarter = new ExecutionGraphRestartCallback(this, globalModVersionForRestart);
 					restartStrategy.restart(restarter, new ScheduledExecutorServiceAdapter(futureExecutor));
 
 					return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
index 5874f91..7f98110 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
@@ -25,27 +25,38 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}. 
- * 
+ * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}.
+ *
  * <p>This callback implementation is one-shot; it can only be used once.
  */
 public class ExecutionGraphRestartCallback implements RestartCallback {
 
-	/** The ExecutionGraph to restart */
+	/** The ExecutionGraph to restart. */
 	private final ExecutionGraph execGraph;
 
-	/** Atomic flag to make sure this is used only once */
+	/** Atomic flag to make sure this is used only once. */
 	private final AtomicBoolean used;
 
-	public ExecutionGraphRestartCallback(ExecutionGraph execGraph) {
+	/** The globalModVersion that the ExecutionGraph needs to have for the restart to go through. */
+	private final long expectedGlobalModVersion;
+
+	/**
+	 * Creates a new ExecutionGraphRestartCallback.
+	 *
+	 * @param execGraph The ExecutionGraph to restart
+	 * @param expectedGlobalModVersion  The globalModVersion that the ExecutionGraph needs to have
+	 *                                  for the restart to go through
+	 */
+	public ExecutionGraphRestartCallback(ExecutionGraph execGraph, long expectedGlobalModVersion) {
 		this.execGraph = checkNotNull(execGraph);
 		this.used = new AtomicBoolean(false);
+		this.expectedGlobalModVersion = expectedGlobalModVersion;
 	}
 
 	@Override
 	public void triggerFullRecovery() {
 		if (used.compareAndSet(false, true)) {
-			execGraph.restart();
+			execGraph.restart(expectedGlobalModVersion);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
index 60e2e8b..ffa2777 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -37,7 +37,7 @@ public interface RestartStrategy {
 	 * Called by the ExecutionGraph to eventually trigger a full recovery.
 	 * The recovery must be triggered on the given callback object, and may be delayed
 	 * with the help of the given scheduled executor.
-	 * 
+	 *
 	 * <p>The thread that calls this method is not supposed to block/sleep.
 	 *
 	 * @param restarter The hook to restart the ExecutionGraph

http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 81702a2..3ce6baa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -36,10 +36,13 @@ import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -60,9 +63,16 @@ import scala.concurrent.impl.Promise;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilDeployedAndSwitchToRunning;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -90,7 +100,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.FAILED, eg.getState());
 
 		// This should not restart the graph.
-		eg.restart();
+		eg.restart(eg.getGlobalModVersion());
 
 		assertEquals(JobStatus.FAILED, eg.getState());
 	}
@@ -187,7 +197,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.CANCELED, executionGraph.getState());
 
 		// The restart has been aborted
-		executionGraph.restart();
+		executionGraph.restart(executionGraph.getGlobalModVersion());
 
 		assertEquals(JobStatus.CANCELED, executionGraph.getState());
 	}
@@ -254,7 +264,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.FAILED, executionGraph.getState());
 
 		// The restart has been aborted
-		executionGraph.restart();
+		executionGraph.restart(executionGraph.getGlobalModVersion());
 
 		assertEquals(JobStatus.FAILED, executionGraph.getState());
 	}
@@ -555,6 +565,106 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.SUSPENDED, eg.getState());
 	}
 
+	@Test
+	public void testConcurrentLocalFailAndRestart() throws Exception {
+		final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
+		eg.setScheduleMode(ScheduleMode.EAGER);
+		eg.scheduleForExecution();
+
+		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+		final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
+		final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
+		final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
+
+		final OneShotLatch failTrigger = new OneShotLatch();
+		final CountDownLatch readyLatch = new CountDownLatch(2);
+
+		Thread failure1 = new Thread() {
+			@Override
+			public void run() {
+				readyLatch.countDown();
+				try {
+					failTrigger.await();
+				} catch (InterruptedException ignored) {}
+
+				first.fail(new Exception("intended test failure 1"));
+			}
+		};
+
+		Thread failure2 = new Thread() {
+			@Override
+			public void run() {
+				readyLatch.countDown();
+				try {
+					failTrigger.await();
+				} catch (InterruptedException ignored) {}
+
+				last.fail(new Exception("intended test failure 2"));
+			}
+		};
+
+		// make sure both threads start simultaneously
+		failure1.start();
+		failure2.start();
+		readyLatch.await();
+		failTrigger.trigger();
+
+		waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
+		completeCancellingForAllVertices(eg);
+
+		waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
+		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+		finishAllVertices(eg);
+
+		eg.waitUntilTerminal();
+		assertEquals(JobStatus.FINISHED, eg.getState());
+	}
+
+	@Test
+	public void testConcurrentGlobalFailAndRestarts() throws Exception {
+		final OneShotLatch restartTrigger = new OneShotLatch();
+
+		final int parallelism = 10;
+		final JobID jid = new JobID();
+		final JobVertex vertex = createNoOpVertex(parallelism);
+		final SlotProvider slots = new SimpleSlotProvider(jid, parallelism, new NotCancelAckingTaskGateway());
+		final TriggeredRestartStrategy restartStrategy = new TriggeredRestartStrategy(restartTrigger);
+
+		final ExecutionGraph eg = createSimpleTestGraph(jid, slots, restartStrategy, vertex);
+		eg.setScheduleMode(ScheduleMode.EAGER);
+		eg.scheduleForExecution();
+
+		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+		// fail into 'RESTARTING'
+		eg.failGlobal(new Exception("intended test failure 1"));
+		assertEquals(JobStatus.FAILING, eg.getState());
+		completeCancellingForAllVertices(eg);
+		waitUntilJobStatus(eg, JobStatus.RESTARTING, 1000);
+
+		eg.failGlobal(new Exception("intended test failure 2"));
+		assertEquals(JobStatus.RESTARTING, eg.getState());
+
+		// trigger both restart strategies to kick in concurrently
+		restartTrigger.trigger();
+
+		waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
+		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+		finishAllVertices(eg);
+
+		eg.waitUntilTerminal();
+		assertEquals(JobStatus.FINISHED, eg.getState());
+
+		if (eg.getNumberOfFullRestarts() > 2) {
+			fail("Too many restarts: " + eg.getNumberOfFullRestarts());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
 	private static class ControllableRestartStrategy implements RestartStrategy {
 
 		private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise<>();
@@ -727,4 +837,49 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		assertEquals(JobStatus.FINISHED, eg.getState());
 	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A TaskManager gateway that does not ack cancellations.
+	 */
+	private static final class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
+
+		@Override
+		public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+			return new FlinkCompletableFuture<>();
+		}
+	}
+
+	/**
+	 * A RestartStrategy that blocks restarting on a given {@link OneShotLatch}.
+	 */
+	private static final class TriggeredRestartStrategy implements RestartStrategy {
+
+		private final OneShotLatch latch;
+
+		TriggeredRestartStrategy(OneShotLatch latch) {
+			this.latch = latch;
+		}
+
+		@Override
+		public boolean canRestart() {
+			return true;
+		}
+
+		@Override
+		public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
+			executor.execute(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						latch.await();
+					} catch (InterruptedException e) {
+						Thread.currentThread().interrupt();
+					}
+					restarter.triggerFullRecovery();
+				}
+			});
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 51b5c7f..83c8fb0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -170,8 +170,8 @@ public class ExecutionGraphTestUtils {
 	}
 
 	/**
-	 * Takes all vertices in the given ExecutionGraph and switches their current
-	 * execution to RUNNING.
+	 * Takes all vertices in the given ExecutionGraph and attempts to move them
+	 * from CANCELING to CANCELED.
 	 */
 	public static void completeCancellingForAllVertices(ExecutionGraph eg) {
 		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
@@ -181,7 +181,7 @@ public class ExecutionGraphTestUtils {
 
 	/**
 	 * Takes all vertices in the given ExecutionGraph and switches their current
-	 * execution to RUNNING.
+	 * execution to FINISHED.
 	 */
 	public static void finishAllVertices(ExecutionGraph eg) {
 		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
@@ -189,6 +189,35 @@ public class ExecutionGraphTestUtils {
 		}
 	}
 
+	/**
+	 * Turns a newly scheduled execution graph into a state where all vertices run.
+	 * This waits until all executions have reached state 'DEPLOYING' and then switches them to running.
+	 */
+	public static void waitUntilDeployedAndSwitchToRunning(ExecutionGraph eg, long timeout) throws TimeoutException {
+		// wait until everything is running
+		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+			final Execution exec = ev.getCurrentExecutionAttempt();
+			waitUntilExecutionState(exec, ExecutionState.DEPLOYING, timeout);
+		}
+
+		// Note: As ugly as it is, we need this minor sleep, because between switching
+		// to 'DEPLOYED' and when the 'switchToRunning()' may be called lies a race check
+		// against concurrent modifications (cancel / fail). We can only switch this to running
+		// once that check is passed. For the actual runtime, this switch is triggered by a callback
+		// from the TaskManager, which comes strictly after that. For tests, we use mock TaskManagers
+		// which cannot easily tell us when that condition has happened, unfortunately.
+		try {
+			Thread.sleep(2);
+		} catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+		}
+
+		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+			final Execution exec = ev.getCurrentExecutionAttempt();
+			exec.switchToRunning();
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  state modifications
 	// ------------------------------------------------------------------------