You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/07/20 13:52:19 UTC
[1/3] flink git commit: [FLINK-6665] [FLINK-6667] [distributed
coordination] Use a callback and a ScheduledExecutor for ExecutionGraph
restarts
Repository: flink
Updated Branches:
refs/heads/release-1.3 129a82fba -> 39f5b1144
[FLINK-6665] [FLINK-6667] [distributed coordination] Use a callback and a ScheduledExecutor for ExecutionGraph restarts
Initial work by zjureel@gmail.com , improved by sewen@apache.org.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c0803d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c0803d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c0803d5
Branch: refs/heads/release-1.3
Commit: 6c0803d5e607d8e0e647a2559570fedf7968a9dc
Parents: 129a82f
Author: zjureel <zj...@gmail.com>
Authored: Tue Jul 18 19:27:56 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 20 15:10:17 2017 +0200
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 7 +-
.../restart/ExecutionGraphRestartCallback.java | 51 ++++++++
.../restart/ExecutionGraphRestarter.java | 45 -------
.../restart/FailureRateRestartStrategy.java | 15 ++-
.../restart/FixedDelayRestartStrategy.java | 15 ++-
.../restart/NoRestartStrategy.java | 5 +-
.../executiongraph/restart/RestartCallback.java | 32 +++++
.../executiongraph/restart/RestartStrategy.java | 12 +-
.../ExecutionGraphMetricsTest.java | 17 +--
.../ExecutionGraphRestartTest.java | 51 ++------
.../executiongraph/ExecutionGraphTestUtils.java | 11 +-
.../restart/FailureRateRestartStrategyTest.java | 128 +++++++++++++++++++
.../restart/FixedDelayRestartStrategyTest.java | 77 ++++++++---
.../restart/InfiniteDelayRestartStrategy.java | 10 +-
.../restart/LatchedRestarter.java | 38 ++++++
.../executiongraph/restart/NoOpRestarter.java | 28 ++++
16 files changed, 407 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 7c13936..9f29faf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -44,11 +44,14 @@ import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
@@ -1388,7 +1391,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
LOG.info("Restarting the job {} ({}).", getJobName(), getJobID());
- restartStrategy.restart(this);
+
+ RestartCallback restarter = new ExecutionGraphRestartCallback(this);
+ restartStrategy.restart(restarter, new ScheduledExecutorServiceAdapter(futureExecutor));
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
new file mode 100644
index 0000000..5874f91
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}.
+ *
+ * <p>This callback implementation is one-shot; it can only be used once.
+ */
+public class ExecutionGraphRestartCallback implements RestartCallback {
+
+ /** The ExecutionGraph to restart */
+ private final ExecutionGraph execGraph;
+
+ /** Atomic flag to make sure this is used only once */
+ private final AtomicBoolean used;
+
+ public ExecutionGraphRestartCallback(ExecutionGraph execGraph) {
+ this.execGraph = checkNotNull(execGraph);
+ this.used = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void triggerFullRecovery() {
+ if (used.compareAndSet(false, true)) {
+ execGraph.restart();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java
deleted file mode 100644
index 9287694..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph.restart;
-
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-
-class ExecutionGraphRestarter {
- private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class);
- public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis) {
- return new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- try {
- LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis);
- // do the delay
- Thread.sleep(delayBetweenRestartAttemptsInMillis);
- } catch(InterruptedException e) {
- // should only happen on shutdown
- }
- executionGraph.restart();
- return null;
- }
- };
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index d95e1c3..6580ec3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.util.Preconditions;
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
* with a fixed time delay in between.
*/
public class FailureRateRestartStrategy implements RestartStrategy {
+
private final Time failuresInterval;
private final Time delayInterval;
private final int maxFailuresPerInterval;
@@ -66,16 +67,22 @@ public class FailureRateRestartStrategy implements RestartStrategy {
}
@Override
- public void restart(final ExecutionGraph executionGraph) {
+ public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
if (isRestartTimestampsQueueFull()) {
restartTimestampsDeque.remove();
}
restartTimestampsDeque.add(System.currentTimeMillis());
- FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getFutureExecutor());
+
+ executor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ restarter.triggerFullRecovery();
+ }
+ }, delayInterval.getSize(), delayInterval.getUnit());
}
private boolean isRestartTimestampsQueueFull() {
- return restartTimestampsDeque.size() == maxFailuresPerInterval;
+ return restartTimestampsDeque.size() >= maxFailuresPerInterval;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index f51ea7c..0dd700a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -20,16 +20,19 @@ package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.util.Preconditions;
import scala.concurrent.duration.Duration;
+import java.util.concurrent.TimeUnit;
+
/**
* Restart strategy which tries to restart the given {@link ExecutionGraph} a fixed number of times
* with a fixed time delay in between.
*/
public class FixedDelayRestartStrategy implements RestartStrategy {
+
private final int maxNumberRestartAttempts;
private final long delayBetweenRestartAttempts;
private int currentRestartAttempt;
@@ -56,9 +59,15 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
}
@Override
- public void restart(final ExecutionGraph executionGraph) {
+ public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
currentRestartAttempt++;
- FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getFutureExecutor());
+
+ executor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ restarter.triggerFullRecovery();
+ }
+ }, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
index 958d9ac..5502d2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph.restart;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
/**
@@ -32,8 +33,8 @@ public class NoRestartStrategy implements RestartStrategy {
}
@Override
- public void restart(ExecutionGraph executionGraph) {
- throw new RuntimeException("NoRestartStrategy does not support restart.");
+ public void restart(RestartCallback restarter, ScheduledExecutor executor) {
+ throw new UnsupportedOperationException("NoRestartStrategy does not support restart.");
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java
new file mode 100644
index 0000000..6499be3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+/**
+ * A callback to trigger restarts, passed to the {@link RestartStrategy} to
+ * trigger recovery on the ExecutionGraph.
+ */
+public interface RestartCallback {
+
+ /**
+ * Triggers a full recovery in the target ExecutionGraph.
+ * A full recovery resets all vertices to the state of the latest checkpoint.
+ */
+ void triggerFullRecovery();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
index 2880c01..60e2e8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph.restart;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
/**
@@ -33,9 +34,14 @@ public interface RestartStrategy {
boolean canRestart();
/**
- * Restarts the given {@link ExecutionGraph}.
+ * Called by the ExecutionGraph to eventually trigger a full recovery.
+ * The recovery must be triggered on the given callback object, and may be delayed
+ * with the help of the given scheduled executor.
+ *
+ * <p>The thread that calls this method is not supposed to block/sleep.
*
- * @param executionGraph The ExecutionGraph to be restarted
+ * @param restarter The hook to restart the ExecutionGraph
+ * @param executor An scheduled executor to delay the restart
*/
- void restart(ExecutionGraph executionGraph);
+ void restart(RestartCallback restarter, ScheduledExecutor executor);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 6b5ceae..0785a26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -24,11 +24,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
@@ -255,25 +257,20 @@ public class ExecutionGraphMetricsTest extends TestLogger {
static class TestingRestartStrategy implements RestartStrategy {
- private boolean restartable = true;
- private ExecutionGraph executionGraph = null;
+ private RestartCallback restarter;
@Override
public boolean canRestart() {
- return restartable;
+ return true;
}
@Override
- public void restart(ExecutionGraph executionGraph) {
- this.executionGraph = executionGraph;
- }
-
- public void setRestartable(boolean restartable) {
- this.restartable = restartable;
+ public void restart(RestartCallback restarter, ScheduledExecutor executor) {
+ this.restarter = restarter;
}
public void restartExecutionGraph() {
- executionGraph.restart();
+ restarter.triggerFullRecovery();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index eeb6c69..81702a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -18,21 +18,24 @@
package org.apache.flink.runtime.executiongraph;
-import akka.dispatch.Futures;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.FailureRateRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -41,6 +44,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.SerializedValue;
@@ -56,11 +60,9 @@ import scala.concurrent.impl.Promise;
import java.io.IOException;
import java.util.Iterator;
-import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -159,34 +161,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
}
@Test
- public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception {
- FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(2, Time.of(10, TimeUnit.SECONDS), Time.of(0, TimeUnit.SECONDS));
- FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS);
- Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
- ExecutionGraph eg = executionGraphInstanceTuple.f0;
-
- restartAfterFailure(eg, timeout, false);
- restartAfterFailure(eg, timeout, false);
- makeAFailureAndWait(eg, timeout);
- //failure rate limit exceeded, so task should be failed
- assertEquals(JobStatus.FAILED, eg.getState());
- }
-
- @Test
- public void taskShouldNotFailWhenFailureRateLimitWasNotExceeded() throws Exception {
- FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(1, Time.of(1, TimeUnit.MILLISECONDS), Time.of(0, TimeUnit.SECONDS));
- FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS);
- Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
- ExecutionGraph eg = executionGraphInstanceTuple.f0;
-
- //task restarted many times, but after all job is still running, because rate limit was not exceeded
- restartAfterFailure(eg, timeout, false);
- restartAfterFailure(eg, timeout, false);
- restartAfterFailure(eg, timeout, false);
- assertEquals(JobStatus.RUNNING, eg.getState());
- }
-
- @Test
public void testCancelWhileRestarting() throws Exception {
// We want to manually control the restart and delay
RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
@@ -618,23 +592,20 @@ public class ExecutionGraphRestartTest extends TestLogger {
}
@Override
- public void restart(final ExecutionGraph executionGraph) {
- Futures.future(new Callable<Object>() {
+ public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
+ executor.execute(new Runnable() {
@Override
- public Object call() throws Exception {
+ public void run() {
try {
-
Await.ready(doRestart.future(), timeout);
- executionGraph.restart();
+ restarter.triggerFullRecovery();
} catch (Exception e) {
exception = e;
}
restartDone.success(true);
-
- return null;
}
- }, TestingUtils.defaultExecutionContext());
+ });
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 140e984..51b5c7f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -254,9 +254,7 @@ public class ExecutionGraphTestUtils {
* restart strategy.
*/
public static ExecutionGraph createSimpleTestGraph(RestartStrategy restartStrategy) throws Exception {
- JobVertex vertex = new JobVertex("vertex");
- vertex.setInvokableClass(NoOpInvokable.class);
- vertex.setParallelism(10);
+ JobVertex vertex = createNoOpVertex(10);
return createSimpleTestGraph(new JobID(), restartStrategy, vertex);
}
@@ -313,6 +311,13 @@ public class ExecutionGraphTestUtils {
1,
TEST_LOGGER);
}
+
+ public static JobVertex createNoOpVertex(int parallelism) {
+ JobVertex vertex = new JobVertex("vertex");
+ vertex.setInvokableClass(NoOpInvokable.class);
+ vertex.setParallelism(parallelism);
+ return vertex;
+ }
// ------------------------------------------------------------------------
// utility mocking methods
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java
new file mode 100644
index 0000000..b42dd77
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for the {@link FailureRateRestartStrategy}.
+ */
+public class FailureRateRestartStrategyTest {
+
+ public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
+
+ public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
+
+ @After
+ public void shutdownExecutor() {
+ executorService.shutdownNow();
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testManyFailuresWithinRate() throws Exception {
+ final int numAttempts = 10;
+ final int intervalMillis = 1;
+
+ final FailureRateRestartStrategy restartStrategy =
+ new FailureRateRestartStrategy(1, Time.milliseconds(intervalMillis), Time.milliseconds(0));
+
+ for (int attempsLeft = numAttempts; attempsLeft > 0; --attempsLeft) {
+ assertTrue(restartStrategy.canRestart());
+ restartStrategy.restart(new NoOpRestarter(), executor);
+ sleepGuaranteed(2 * intervalMillis);
+ }
+
+ assertTrue(restartStrategy.canRestart());
+ }
+
+ @Test
+ public void testFailuresExceedingRate() throws Exception {
+ final int numFailures = 3;
+ final int intervalMillis = 10_000;
+
+ final FailureRateRestartStrategy restartStrategy =
+ new FailureRateRestartStrategy(numFailures, Time.milliseconds(intervalMillis), Time.milliseconds(0));
+
+ for (int failuresLeft = numFailures; failuresLeft > 0; --failuresLeft) {
+ assertTrue(restartStrategy.canRestart());
+ restartStrategy.restart(new NoOpRestarter(), executor);
+ }
+
+ // now the rate should be exceeded
+ assertFalse(restartStrategy.canRestart());
+ }
+
+ @Test
+ public void testDelay() throws Exception {
+ final long restartDelay = 2;
+ final int numberRestarts = 10;
+
+ final FailureRateRestartStrategy strategy =
+ new FailureRateRestartStrategy(numberRestarts + 1, Time.milliseconds(1), Time.milliseconds(restartDelay));
+
+ for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) {
+ assertTrue(strategy.canRestart());
+
+ final OneShotLatch sync = new OneShotLatch();
+ final RestartCallback restarter = new LatchedRestarter(sync);
+
+ final long time = System.nanoTime();
+ strategy.restart(restarter, executor);
+ sync.await();
+
+ final long elapsed = System.nanoTime() - time;
+ assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * This method makes sure that the actual interval and is not spuriously waking up.
+ */
+ private static void sleepGuaranteed(long millis) throws InterruptedException {
+ final long deadline = System.nanoTime() + millis * 1_000_000;
+
+ long nanosToSleep;
+ while ((nanosToSleep = deadline - System.nanoTime()) > 0) {
+ long millisToSleep = nanosToSleep / 1_000_000;
+ if (nanosToSleep % 1_000_000 != 0) {
+ millisToSleep++;
+ }
+
+ Thread.sleep(millisToSleep);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
index 4beedb0..17504d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
@@ -18,34 +18,75 @@
package org.apache.flink.runtime.executiongraph.restart;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.junit.After;
import org.junit.Test;
-import org.mockito.Mockito;
-import scala.concurrent.ExecutionContext$;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for the {@link FixedDelayRestartStrategy}.
+ */
public class FixedDelayRestartStrategyTest {
+ public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
+
+ public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
+
+ @After
+ public void shutdownExecutor() {
+ executorService.shutdownNow();
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testNumberOfRestarts() throws Exception {
+ final int numberRestarts = 10;
+
+ final FixedDelayRestartStrategy strategy =
+ new FixedDelayRestartStrategy(numberRestarts, 0L);
+
+ for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) {
+ // two calls to 'canRestart()' to make sure this is not used to maintain the counter
+ assertTrue(strategy.canRestart());
+ assertTrue(strategy.canRestart());
+
+ strategy.restart(new NoOpRestarter(), executor);
+ }
+
+ assertFalse(strategy.canRestart());
+ }
+
@Test
- public void testFixedDelayRestartStrategy() {
- int numberRestarts = 10;
- long restartDelay = 10;
+ public void testDelay() throws Exception {
+ final long restartDelay = 10;
+ final int numberRestarts = 10;
+
+ final FixedDelayRestartStrategy strategy =
+ new FixedDelayRestartStrategy(numberRestarts, restartDelay);
+
+ for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) {
+ assertTrue(strategy.canRestart());
- FixedDelayRestartStrategy fixedDelayRestartStrategy = new FixedDelayRestartStrategy(
- numberRestarts,
- restartDelay);
+ final OneShotLatch sync = new OneShotLatch();
+ final RestartCallback restarter = new LatchedRestarter(sync);
- ExecutionGraph executionGraph = mock(ExecutionGraph.class);
- when(executionGraph.getFutureExecutor())
- .thenReturn(ExecutionContext$.MODULE$.fromExecutor(MoreExecutors.directExecutor()));
+ final long time = System.nanoTime();
+ strategy.restart(restarter, executor);
+ sync.await();
- while(fixedDelayRestartStrategy.canRestart()) {
- fixedDelayRestartStrategy.restart(executionGraph);
+ final long elapsed = System.nanoTime() - time;
+ assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000);
}
- Mockito.verify(executionGraph, Mockito.times(numberRestarts)).restart();
+ assertFalse(strategy.canRestart());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
index c1cbdd3..d145298 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph.restart;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
* Actually {@link ExecutionGraph} will never be restarted. No additional threads will be used.
*/
public class InfiniteDelayRestartStrategy implements RestartStrategy {
+
private static final Logger LOG = LoggerFactory.getLogger(InfiniteDelayRestartStrategy.class);
private final int maxRestartAttempts;
@@ -43,15 +45,11 @@ public class InfiniteDelayRestartStrategy implements RestartStrategy {
@Override
public boolean canRestart() {
- if (maxRestartAttempts >= 0) {
- return restartAttemptCounter < maxRestartAttempts;
- } else {
- return true;
- }
+ return maxRestartAttempts < 0 || restartAttemptCounter < maxRestartAttempts;
}
@Override
- public void restart(ExecutionGraph executionGraph) {
+ public void restart(RestartCallback restarter, ScheduledExecutor executor) {
LOG.info("Delaying retry of job execution forever");
if (maxRestartAttempts >= 0) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java
new file mode 100644
index 0000000..7682fab
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+
+/**
+ * A testing RestartCallback that triggers a latch when restart is triggered.
+ */
+class LatchedRestarter implements RestartCallback {
+
+ private final OneShotLatch latch;
+
+ LatchedRestarter(OneShotLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void triggerFullRecovery() {
+ latch.trigger();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java
new file mode 100644
index 0000000..04a7c10
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+/**
+ * A testing RestartCallback that does nothing.
+ */
+class NoOpRestarter implements RestartCallback {
+
+ @Override
+ public void triggerFullRecovery() {}
+}
[3/3] flink git commit: [FLINK-7231] [distr. coordination] Fix slot
release affecting SlotSharingGroup cleanup
Posted by se...@apache.org.
[FLINK-7231] [distr. coordination] Fix slot release affecting SlotSharingGroup cleanup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39f5b114
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39f5b114
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39f5b114
Branch: refs/heads/release-1.3
Commit: 39f5b1144167dcb80e8708f4cb5426e76f648026
Parents: e6348fb
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 19 10:24:52 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 20 15:43:14 2017 +0200
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 9 +-
.../ExecutionGraphRestartTest.java | 149 ++++++++++++++++---
.../executiongraph/ExecutionGraphTestUtils.java | 19 ++-
.../utils/NotCancelAckingTaskGateway.java | 32 ++++
4 files changed, 183 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a7d768b..f9d2d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -929,13 +929,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
catch (Throwable t) {
// we catch everything here to make sure cleanup happens and the
- // ExecutionGraph notices
- // we need to go into recovery and make sure to release all slots
+ // ExecutionGraph notices the error
+
+ // we need to to release all slots before going into recovery!
try {
- failGlobal(t);
+ ExecutionGraphUtils.releaseAllSlotsSilently(resources);
}
finally {
- ExecutionGraphUtils.releaseAllSlotsSilently(resources);
+ failGlobal(t);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 3ce6baa..bd8b4ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,13 +21,12 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
@@ -35,24 +34,33 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
+import org.junit.After;
import org.junit.Test;
import scala.concurrent.Await;
@@ -62,8 +70,12 @@ import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
@@ -85,6 +97,15 @@ public class ExecutionGraphRestartTest extends TestLogger {
private final static int NUM_TASKS = 31;
+ private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
+
+ @After
+ public void shutdown() {
+ executor.shutdownNow();
+ }
+
+ // ------------------------------------------------------------------------
+
@Test
public void testNoManualRestart() throws Exception {
NoRestartStrategy restartStrategy = new NoRestartStrategy();
@@ -661,10 +682,117 @@ public class ExecutionGraphRestartTest extends TestLogger {
}
}
+ @Test
+ public void testRestartWithEagerSchedulingAndSlotSharing() throws Exception {
+ // this test is inconclusive if not used with a proper multi-threaded executor
+ assertTrue("test assumptions violated", ((ThreadPoolExecutor) executor).getCorePoolSize() > 1);
+
+ final int parallelism = 20;
+ final Scheduler scheduler = createSchedulerWithInstances(parallelism);
+
+ final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+
+ final JobVertex source = new JobVertex("source");
+ source.setInvokableClass(NoOpInvokable.class);
+ source.setParallelism(parallelism);
+ source.setSlotSharingGroup(sharingGroup);
+
+ final JobVertex sink = new JobVertex("sink");
+ sink.setInvokableClass(NoOpInvokable.class);
+ sink.setParallelism(parallelism);
+ sink.setSlotSharingGroup(sharingGroup);
+ sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+
+ final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
+ new JobID(), scheduler, new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0), executor, source, sink);
+
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.scheduleForExecution();
+
+ waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+ // fail into 'RESTARTING'
+ eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
+ new Exception("intended test failure"));
+
+ assertEquals(JobStatus.FAILING, eg.getState());
+ completeCancellingForAllVertices(eg);
+
+ // clean termination
+ waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
+ waitUntilDeployedAndSwitchToRunning(eg, 1000);
+ finishAllVertices(eg);
+ waitUntilJobStatus(eg, JobStatus.FINISHED, 1000);
+ }
+
+ @Test
+ public void testRestartWithSlotSharingAndNotEnoughResources() throws Exception {
+ // this test is inconclusive if not used with a proper multi-threaded executor
+ assertTrue("test assumptions violated", ((ThreadPoolExecutor) executor).getCorePoolSize() > 1);
+
+ final int numRestarts = 10;
+ final int parallelism = 20;
+
+ final Scheduler scheduler = createSchedulerWithInstances(parallelism - 1);
+
+ final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+
+ final JobVertex source = new JobVertex("source");
+ source.setInvokableClass(NoOpInvokable.class);
+ source.setParallelism(parallelism);
+ source.setSlotSharingGroup(sharingGroup);
+
+ final JobVertex sink = new JobVertex("sink");
+ sink.setInvokableClass(NoOpInvokable.class);
+ sink.setParallelism(parallelism);
+ sink.setSlotSharingGroup(sharingGroup);
+ sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+
+ final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
+ new JobID(), scheduler, new FixedDelayRestartStrategy(numRestarts, 0), executor, source, sink);
+
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.scheduleForExecution();
+
+ // wait until no more changes happen
+ while (eg.getNumberOfFullRestarts() < numRestarts) {
+ Thread.sleep(1);
+ }
+
+ waitUntilJobStatus(eg, JobStatus.FAILED, 1000);
+
+ final Throwable t = eg.getFailureCause();
+ if (!(t instanceof NoResourceAvailableException)) {
+ ExceptionUtils.rethrowException(t, t.getMessage());
+ }
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
+ private Scheduler createSchedulerWithInstances(int num) {
+ final Scheduler scheduler = new Scheduler(executor);
+ final Instance[] instances = new Instance[num];
+
+ for (int i = 0; i < instances.length; i++) {
+ instances[i] = createInstance(55443 + i);
+ scheduler.newInstanceAvailable(instances[i]);
+ }
+
+ return scheduler;
+ }
+
+ private static Instance createInstance(int port) {
+ final HardwareDescription resources = new HardwareDescription(4, 1_000_000_000, 500_000_000, 400_000_000);
+ final TaskManagerGateway taskManager = new SimpleAckingTaskManagerGateway();
+ final TaskManagerLocation location = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), port);
+ return new Instance(taskManager, location, new InstanceID(), resources, 1);
+ }
+
+ // ------------------------------------------------------------------------
+
private static class ControllableRestartStrategy implements RestartStrategy {
private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise<>();
@@ -773,10 +901,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
scheduler);
}
- private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
- return newExecutionGraph(restartStrategy, new Scheduler(TestingUtils.defaultExecutionContext()));
- }
-
private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {
makeAFailureAndWait(eg, timeout);
@@ -841,17 +965,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
// ------------------------------------------------------------------------
/**
- * A TaskManager gateway that does not ack cancellations.
- */
- private static final class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
-
- @Override
- public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
- return new FlinkCompletableFuture<>();
- }
- }
-
- /**
* A RestartStrategy that blocks restarting on a given {@link OneShotLatch}.
*/
private static final class TriggeredRestartStrategy implements RestartStrategy {
http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 83c8fb0..dbbd3e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -321,6 +322,16 @@ public class ExecutionGraphTestUtils {
RestartStrategy restartStrategy,
JobVertex... vertices) throws Exception {
+ return createExecutionGraph(jid, slotProvider, restartStrategy, TestingUtils.defaultExecutor(), vertices);
+ }
+
+ public static ExecutionGraph createExecutionGraph(
+ JobID jid,
+ SlotProvider slotProvider,
+ RestartStrategy restartStrategy,
+ ScheduledExecutorService executor,
+ JobVertex... vertices) throws Exception {
+
checkNotNull(jid);
checkNotNull(restartStrategy);
checkNotNull(vertices);
@@ -329,18 +340,18 @@ public class ExecutionGraphTestUtils {
null,
new JobGraph(jid, "test job", vertices),
new Configuration(),
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
+ executor,
+ executor,
slotProvider,
ExecutionGraphTestUtils.class.getClassLoader(),
- mock(CheckpointRecoveryFactory.class),
+ new StandaloneCheckpointRecoveryFactory(),
Time.seconds(10),
restartStrategy,
new UnregisteredMetricsGroup(),
1,
TEST_LOGGER);
}
-
+
public static JobVertex createNoOpVertex(int parallelism) {
JobVertex vertex = new JobVertex("vertex");
vertex.setInvokableClass(NoOpInvokable.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
new file mode 100644
index 0000000..f453d20
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.utils;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+public class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
+
+ @Override
+ public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ return new FlinkCompletableFuture<>();
+ }
+}
\ No newline at end of file
[2/3] flink git commit: [FLINK-7216] [distr. coordination] Guard
against concurrent global failover
Posted by se...@apache.org.
[FLINK-7216] [distr. coordination] Guard against concurrent global failover
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e6348fbd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e6348fbd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e6348fbd
Branch: refs/heads/release-1.3
Commit: e6348fbde1fc0ee8ea682063a4d6503ba3b68864
Parents: 6c0803d
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jul 18 19:49:56 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 20 15:26:43 2017 +0200
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 35 ++--
.../restart/ExecutionGraphRestartCallback.java | 23 ++-
.../executiongraph/restart/RestartStrategy.java | 2 +-
.../ExecutionGraphRestartTest.java | 161 ++++++++++++++++++-
.../executiongraph/ExecutionGraphTestUtils.java | 35 +++-
5 files changed, 231 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9f29faf..a7d768b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -966,7 +966,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
if (transitionState(current, JobStatus.CANCELLING)) {
// make sure no concurrent local actions interfere with the cancellation
- incrementGlobalModVersion();
+ final long globalVersionForRestart = incrementGlobalModVersion();
final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
@@ -980,7 +980,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
allTerminal.thenAccept(new AcceptFunction<Void>() {
@Override
public void accept(Void value) {
- allVerticesInTerminalState();
+ // cancellations may currently be overridden by failures which trigger
+ // restarts, so we need to pass a proper restart global version here
+ allVerticesInTerminalState(globalVersionForRestart);
}
});
@@ -1085,17 +1087,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return;
}
else if (current == JobStatus.RESTARTING) {
+ // we handle 'failGlobal()' while in 'RESTARTING' as a safety net in case something
+ // has gone wrong in 'RESTARTING' and we need to re-attempt the restarts
this.failureCause = t;
- if (tryRestartOrFail()) {
+ final long globalVersionForRestart = incrementGlobalModVersion();
+ if (tryRestartOrFail(globalVersionForRestart)) {
return;
}
}
else if (transitionState(current, JobStatus.FAILING, t)) {
this.failureCause = t;
- // make sure no concurrent local actions interfere with the cancellation
- incrementGlobalModVersion();
+ // make sure no concurrent local or global actions interfere with the failover
+ final long globalVersionForRestart = incrementGlobalModVersion();
// we build a future that is complete once all vertices have reached a terminal state
final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
@@ -1109,7 +1114,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
allTerminal.thenAccept(new AcceptFunction<Void>() {
@Override
public void accept(Void value) {
- allVerticesInTerminalState();
+ allVerticesInTerminalState(globalVersionForRestart);
}
});
@@ -1120,10 +1125,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
- public void restart() {
+ public void restart(long expectedGlobalVersion) {
try {
synchronized (progressLock) {
- JobStatus current = state;
+ // check the global version to see whether this recovery attempt is still valid
+ if (globalModVersion != expectedGlobalVersion) {
+ LOG.info("Concurrent full restart subsumed this restart.");
+ return;
+ }
+
+ final JobStatus current = state;
if (current == JobStatus.CANCELED) {
LOG.info("Canceled job during restart. Aborting restart.");
@@ -1329,7 +1340,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* This method is a callback during cancellation/failover and called when all tasks
* have reached a terminal state (cancelled/failed/finished).
*/
- private void allVerticesInTerminalState() {
+ private void allVerticesInTerminalState(long expectedGlobalVersionForRestart) {
// we are done, transition to the final state
JobStatus current;
while (true) {
@@ -1345,7 +1356,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
else if (current == JobStatus.FAILING) {
- if (tryRestartOrFail()) {
+ if (tryRestartOrFail(expectedGlobalVersionForRestart)) {
break;
}
// concurrent job status change, let's check again
@@ -1374,7 +1385,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
*
* @return true if the operation could be executed; false if a concurrent job status change occurred
*/
- private boolean tryRestartOrFail() {
+ private boolean tryRestartOrFail(long globalModVersionForRestart) {
JobStatus currentState = state;
if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
@@ -1392,7 +1403,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
LOG.info("Restarting the job {} ({}).", getJobName(), getJobID());
- RestartCallback restarter = new ExecutionGraphRestartCallback(this);
+ RestartCallback restarter = new ExecutionGraphRestartCallback(this, globalModVersionForRestart);
restartStrategy.restart(restarter, new ScheduledExecutorServiceAdapter(futureExecutor));
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
index 5874f91..7f98110 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
@@ -25,27 +25,38 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}.
- *
+ * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}.
+ *
* <p>This callback implementation is one-shot; it can only be used once.
*/
public class ExecutionGraphRestartCallback implements RestartCallback {
- /** The ExecutionGraph to restart */
+ /** The ExecutionGraph to restart. */
private final ExecutionGraph execGraph;
- /** Atomic flag to make sure this is used only once */
+ /** Atomic flag to make sure this is used only once. */
private final AtomicBoolean used;
- public ExecutionGraphRestartCallback(ExecutionGraph execGraph) {
+ /** The globalModVersion that the ExecutionGraph needs to have for the restart to go through. */
+ private final long expectedGlobalModVersion;
+
+ /**
+ * Creates a new ExecutionGraphRestartCallback.
+ *
+ * @param execGraph The ExecutionGraph to restart
+ * @param expectedGlobalModVersion The globalModVersion that the ExecutionGraph needs to have
+ * for the restart to go through
+ */
+ public ExecutionGraphRestartCallback(ExecutionGraph execGraph, long expectedGlobalModVersion) {
this.execGraph = checkNotNull(execGraph);
this.used = new AtomicBoolean(false);
+ this.expectedGlobalModVersion = expectedGlobalModVersion;
}
@Override
public void triggerFullRecovery() {
if (used.compareAndSet(false, true)) {
- execGraph.restart();
+ execGraph.restart(expectedGlobalModVersion);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
index 60e2e8b..ffa2777 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -37,7 +37,7 @@ public interface RestartStrategy {
* Called by the ExecutionGraph to eventually trigger a full recovery.
* The recovery must be triggered on the given callback object, and may be delayed
* with the help of the given scheduled executor.
- *
+ *
* <p>The thread that calls this method is not supposed to block/sleep.
*
* @param restarter The hook to restart the ExecutionGraph
http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 81702a2..3ce6baa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -36,10 +36,13 @@ import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -60,9 +63,16 @@ import scala.concurrent.impl.Promise;
import java.io.IOException;
import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilDeployedAndSwitchToRunning;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -90,7 +100,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.FAILED, eg.getState());
// This should not restart the graph.
- eg.restart();
+ eg.restart(eg.getGlobalModVersion());
assertEquals(JobStatus.FAILED, eg.getState());
}
@@ -187,7 +197,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.CANCELED, executionGraph.getState());
// The restart has been aborted
- executionGraph.restart();
+ executionGraph.restart(executionGraph.getGlobalModVersion());
assertEquals(JobStatus.CANCELED, executionGraph.getState());
}
@@ -254,7 +264,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.FAILED, executionGraph.getState());
// The restart has been aborted
- executionGraph.restart();
+ executionGraph.restart(executionGraph.getGlobalModVersion());
assertEquals(JobStatus.FAILED, executionGraph.getState());
}
@@ -555,6 +565,106 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.SUSPENDED, eg.getState());
}
+ @Test
+ public void testConcurrentLocalFailAndRestart() throws Exception {
+ final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.scheduleForExecution();
+
+ waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+ final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
+ final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
+ final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
+
+ final OneShotLatch failTrigger = new OneShotLatch();
+ final CountDownLatch readyLatch = new CountDownLatch(2);
+
+ Thread failure1 = new Thread() {
+ @Override
+ public void run() {
+ readyLatch.countDown();
+ try {
+ failTrigger.await();
+ } catch (InterruptedException ignored) {}
+
+ first.fail(new Exception("intended test failure 1"));
+ }
+ };
+
+ Thread failure2 = new Thread() {
+ @Override
+ public void run() {
+ readyLatch.countDown();
+ try {
+ failTrigger.await();
+ } catch (InterruptedException ignored) {}
+
+ last.fail(new Exception("intended test failure 2"));
+ }
+ };
+
+ // make sure both threads start simultaneously
+ failure1.start();
+ failure2.start();
+ readyLatch.await();
+ failTrigger.trigger();
+
+ waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
+ completeCancellingForAllVertices(eg);
+
+ waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
+ waitUntilDeployedAndSwitchToRunning(eg, 1000);
+ finishAllVertices(eg);
+
+ eg.waitUntilTerminal();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+
+ @Test
+ public void testConcurrentGlobalFailAndRestarts() throws Exception {
+ final OneShotLatch restartTrigger = new OneShotLatch();
+
+ final int parallelism = 10;
+ final JobID jid = new JobID();
+ final JobVertex vertex = createNoOpVertex(parallelism);
+ final SlotProvider slots = new SimpleSlotProvider(jid, parallelism, new NotCancelAckingTaskGateway());
+ final TriggeredRestartStrategy restartStrategy = new TriggeredRestartStrategy(restartTrigger);
+
+ final ExecutionGraph eg = createSimpleTestGraph(jid, slots, restartStrategy, vertex);
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.scheduleForExecution();
+
+ waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+ // fail into 'RESTARTING'
+ eg.failGlobal(new Exception("intended test failure 1"));
+ assertEquals(JobStatus.FAILING, eg.getState());
+ completeCancellingForAllVertices(eg);
+ waitUntilJobStatus(eg, JobStatus.RESTARTING, 1000);
+
+ eg.failGlobal(new Exception("intended test failure 2"));
+ assertEquals(JobStatus.RESTARTING, eg.getState());
+
+ // trigger both restart strategies to kick in concurrently
+ restartTrigger.trigger();
+
+ waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
+ waitUntilDeployedAndSwitchToRunning(eg, 1000);
+ finishAllVertices(eg);
+
+ eg.waitUntilTerminal();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+
+ if (eg.getNumberOfFullRestarts() > 2) {
+ fail("Too many restarts: " + eg.getNumberOfFullRestarts());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
private static class ControllableRestartStrategy implements RestartStrategy {
private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise<>();
@@ -727,4 +837,49 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.FINISHED, eg.getState());
}
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A TaskManager gateway that does not ack cancellations.
+ */
+ private static final class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
+
+ @Override
+ public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ return new FlinkCompletableFuture<>();
+ }
+ }
+
+ /**
+ * A RestartStrategy that blocks restarting on a given {@link OneShotLatch}.
+ */
+ private static final class TriggeredRestartStrategy implements RestartStrategy {
+
+ private final OneShotLatch latch;
+
+ TriggeredRestartStrategy(OneShotLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public boolean canRestart() {
+ return true;
+ }
+
+ @Override
+ public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ restarter.triggerFullRecovery();
+ }
+ });
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 51b5c7f..83c8fb0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -170,8 +170,8 @@ public class ExecutionGraphTestUtils {
}
/**
- * Takes all vertices in the given ExecutionGraph and switches their current
- * execution to RUNNING.
+ * Takes all vertices in the given ExecutionGraph and attempts to move them
+ * from CANCELING to CANCELED.
*/
public static void completeCancellingForAllVertices(ExecutionGraph eg) {
for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
@@ -181,7 +181,7 @@ public class ExecutionGraphTestUtils {
/**
* Takes all vertices in the given ExecutionGraph and switches their current
- * execution to RUNNING.
+ * execution to FINISHED.
*/
public static void finishAllVertices(ExecutionGraph eg) {
for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
@@ -189,6 +189,35 @@ public class ExecutionGraphTestUtils {
}
}
+ /**
+ * Turns a newly scheduled execution graph into a state where all vertices run.
+ * This waits until all executions have reached state 'DEPLOYING' and then switches them to running.
+ */
+ public static void waitUntilDeployedAndSwitchToRunning(ExecutionGraph eg, long timeout) throws TimeoutException {
+ // wait until everything is running
+ for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+ final Execution exec = ev.getCurrentExecutionAttempt();
+ waitUntilExecutionState(exec, ExecutionState.DEPLOYING, timeout);
+ }
+
+ // Note: As ugly as it is, we need this minor sleep, because between switching
+ // to 'DEPLOYED' and when the 'switchToRunning()' may be called lies a race check
+ // against concurrent modifications (cancel / fail). We can only switch this to running
+ // once that check is passed. For the actual runtime, this switch is triggered by a callback
+ // from the TaskManager, which comes strictly after that. For tests, we use mock TaskManagers
+ // which cannot easily tell us when that condition has happened, unfortunately.
+ try {
+ Thread.sleep(2);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+ final Execution exec = ev.getCurrentExecutionAttempt();
+ exec.switchToRunning();
+ }
+ }
+
// ------------------------------------------------------------------------
// state modifications
// ------------------------------------------------------------------------