You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/04/29 11:23:18 UTC
flink git commit: Revert "[FLINK-3800] [jobmanager] Terminate
ExecutionGraphs properly"
Repository: flink
Updated Branches:
refs/heads/release-1.0 f1d34b17b -> 0708dd08b
Revert "[FLINK-3800] [jobmanager] Terminate ExecutionGraphs properly"
This reverts commit 014a686ec5ea0e8809a5235fe988f828e5e70833.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0708dd08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0708dd08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0708dd08
Branch: refs/heads/release-1.0
Commit: 0708dd08b57d8cfca20c470ab4e909ea56cb9a38
Parents: f1d34b1
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Apr 29 11:22:33 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Apr 29 11:22:33 2016 +0200
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 28 +--
.../restart/FixedDelayRestartStrategy.java | 30 +--
.../restart/NoRestartStrategy.java | 17 +-
.../executiongraph/restart/RestartStrategy.java | 5 -
.../restart/RestartStrategyFactory.java | 28 +--
.../flink/runtime/jobmanager/JobManager.scala | 16 +-
.../JobManagerLeaderElectionTest.java | 2 +-
.../LeaderChangeJobRecoveryTest.java | 198 -------------------
.../LeaderChangeStateCleanupTest.java | 2 +-
.../LeaderElectionRetrievalTestingCluster.java | 23 +--
.../runtime/testingUtils/TestingCluster.scala | 10 +-
.../testingUtils/TestingJobManager.scala | 6 +-
.../flink/yarn/TestingYarnJobManager.scala | 8 +-
.../org/apache/flink/yarn/YarnJobManager.scala | 8 +-
14 files changed, 49 insertions(+), 332 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8cb1ded..7cb83cd 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -790,14 +790,6 @@ public class ExecutionGraph implements Serializable {
}
public void fail(Throwable t) {
- if (t instanceof UnrecoverableException) {
- if (restartStrategy != null) {
- // disable the restart strategy in case that we have seen a SuppressRestartsException
- // it basically overrides the restart behaviour of a the root cause
- restartStrategy.disable();
- }
- }
-
while (true) {
JobStatus current = state;
if (current == JobStatus.FAILING || current.isTerminalState()) {
@@ -1021,17 +1013,15 @@ public class ExecutionGraph implements Serializable {
}
}
else if (current == JobStatus.FAILING) {
- if (restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
- // double check in case that in the meantime a SuppressRestartsException was thrown
- if (restartStrategy.canRestart()) {
- restartStrategy.restart(this);
- break;
- } else {
- fail(new Exception("ExecutionGraph went into RESTARTING state but " +
- "then the restart strategy was disabled."));
- }
-
- } else if (!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, failureCause)) {
+ boolean isRecoverable = !(failureCause instanceof UnrecoverableException);
+
+ if (isRecoverable && restartStrategy.canRestart() &&
+ transitionState(current, JobStatus.RESTARTING)) {
+ restartStrategy.restart(this);
+ break;
+
+ } else if ((!isRecoverable || !restartStrategy.canRestart()) &&
+ transitionState(current, JobStatus.FAILED, failureCause)) {
postRunCleanup();
break;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index 464b48e..d3c7eba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -41,7 +41,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
private final int maxNumberRestartAttempts;
private final long delayBetweenRestartAttempts;
private int currentRestartAttempt;
- private boolean disabled = false;
public FixedDelayRestartStrategy(
int maxNumberRestartAttempts,
@@ -61,7 +60,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
@Override
public boolean canRestart() {
- return !disabled && currentRestartAttempt < maxNumberRestartAttempts;
+ return currentRestartAttempt < maxNumberRestartAttempts;
}
@Override
@@ -84,11 +83,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
}, executionGraph.getExecutionContext());
}
- @Override
- public void disable() {
- disabled = true;
- }
-
/**
* Creates a FixedDelayRestartStrategy from the given Configuration.
*
@@ -96,7 +90,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
* @return Initialized instance of FixedDelayRestartStrategy
* @throws Exception
*/
- public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
+ public static FixedDelayRestartStrategy create(Configuration configuration) throws Exception {
int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
String timeoutString = configuration.getString(
@@ -124,7 +118,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
}
}
- return new FixedDelayRestartStrategyFactory(maxAttempts, delay);
+ return new FixedDelayRestartStrategy(maxAttempts, delay);
}
@Override
@@ -134,22 +128,4 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
", delayBetweenRestartAttempts=" + delayBetweenRestartAttempts +
')';
}
-
- public static class FixedDelayRestartStrategyFactory extends RestartStrategyFactory {
-
- private static final long serialVersionUID = 6642934067762271950L;
-
- private final int maxAttempts;
- private final long delay;
-
- public FixedDelayRestartStrategyFactory(int maxAttempts, long delay) {
- this.maxAttempts = maxAttempts;
- this.delay = delay;
- }
-
- @Override
- public RestartStrategy createRestartStrategy() {
- return new FixedDelayRestartStrategy(maxAttempts, delay);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
index 6cc5ee4..8911a98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -36,31 +36,18 @@ public class NoRestartStrategy implements RestartStrategy {
throw new RuntimeException("NoRestartStrategy does not support restart.");
}
- @Override
- public void disable() {}
-
/**
* Creates a NoRestartStrategy instance.
*
* @param configuration Configuration object which is ignored
* @return NoRestartStrategy instance
*/
- public static NoRestartStrategyFactory createFactory(Configuration configuration) {
- return new NoRestartStrategyFactory();
+ public static NoRestartStrategy create(Configuration configuration) {
+ return new NoRestartStrategy();
}
@Override
public String toString() {
return "NoRestartStrategy";
}
-
- public static class NoRestartStrategyFactory extends RestartStrategyFactory {
-
- private static final long serialVersionUID = -1809462525812787862L;
-
- @Override
- public RestartStrategy createRestartStrategy() {
- return new NoRestartStrategy();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
index c9e6277..2880c01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -38,9 +38,4 @@ public interface RestartStrategy {
* @param executionGraph The ExecutionGraph to be restarted
*/
void restart(ExecutionGraph executionGraph);
-
- /**
- * Disables the restart strategy.
- */
- void disable();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index e58d775..68d114e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -25,21 +25,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
-import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-public abstract class RestartStrategyFactory implements Serializable {
- private static final long serialVersionUID = 7320252552640522191L;
-
+public class RestartStrategyFactory {
private static final Logger LOG = LoggerFactory.getLogger(RestartStrategyFactory.class);
- private static final String CREATE_METHOD = "createFactory";
-
- /**
- * Factory method to create a restart strategy
- * @return The created restart strategy
- */
- public abstract RestartStrategy createRestartStrategy();
+ private static final String CREATE_METHOD = "create";
/**
* Creates a {@link RestartStrategy} instance from the given {@link org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration}.
@@ -67,10 +58,11 @@ public abstract class RestartStrategyFactory implements Serializable {
/**
* Creates a {@link RestartStrategy} instance from the given {@link Configuration}.
*
+ * @param configuration Configuration object containing the configuration values.
* @return RestartStrategy instance
* @throws Exception which indicates that the RestartStrategy could not be instantiated.
*/
- public static RestartStrategyFactory createRestartStrategyFactory(Configuration configuration) throws Exception {
+ public static RestartStrategy createFromConfig(Configuration configuration) throws Exception {
String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, "none").toLowerCase();
switch (restartStrategyName) {
@@ -100,16 +92,16 @@ public abstract class RestartStrategyFactory implements Serializable {
}
if (numberExecutionRetries > 0 && delay >= 0) {
- return new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries, delay);
+ return new FixedDelayRestartStrategy(numberExecutionRetries, delay);
} else {
- return NoRestartStrategy.createFactory(configuration);
+ return NoRestartStrategy.create(configuration);
}
case "off":
case "disable":
- return NoRestartStrategy.createFactory(configuration);
+ return NoRestartStrategy.create(configuration);
case "fixeddelay":
case "fixed-delay":
- return FixedDelayRestartStrategy.createFactory(configuration);
+ return FixedDelayRestartStrategy.create(configuration);
default:
try {
Class<?> clazz = Class.forName(restartStrategyName);
@@ -121,7 +113,7 @@ public abstract class RestartStrategyFactory implements Serializable {
Object result = method.invoke(null, configuration);
if (result != null) {
- return (RestartStrategyFactory) result;
+ return (RestartStrategy) result;
}
}
}
@@ -136,7 +128,7 @@ public abstract class RestartStrategyFactory implements Serializable {
}
// fallback in case of an error
- return NoRestartStrategy.createFactory(configuration);
+ return NoRestartStrategy.create(configuration);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 6391c82..cee5606 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -28,7 +28,6 @@ import akka.actor._
import akka.pattern.ask
import grizzled.slf4j.Logger
-import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
import org.apache.flink.api.common.{ExecutionConfig, JobID}
import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
@@ -112,7 +111,7 @@ class JobManager(
protected val scheduler: FlinkScheduler,
protected val libraryCacheManager: BlobLibraryCacheManager,
protected val archive: ActorRef,
- protected val restartStrategyFactory: RestartStrategyFactory,
+ protected val defaultRestartStrategy: RestartStrategy,
protected val timeout: FiniteDuration,
protected val leaderElectionService: LeaderElectionService,
protected val submittedJobGraphs : SubmittedJobGraphStore,
@@ -201,7 +200,7 @@ class JobManager(
log.info(s"Stopping JobManager $getAddress.")
val newFuturesToComplete = cancelAndClearEverything(
- new UnrecoverableException(new Exception("The JobManager is shutting down.")),
+ new Exception("The JobManager is shutting down."),
removeJobFromStateBackend = true)
implicit val executionContext = context.dispatcher
@@ -298,7 +297,7 @@ class JobManager(
log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.")
val newFuturesToComplete = cancelAndClearEverything(
- new UnrecoverableException(new Exception("JobManager is no longer the leader.")),
+ new Exception("JobManager is no longer the leader."),
removeJobFromStateBackend = false)
futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete)
@@ -951,7 +950,7 @@ class JobManager(
val restartStrategy = Option(jobGraph.getRestartStrategyConfiguration())
.map(RestartStrategyFactory.createRestartStrategy(_)) match {
case Some(strategy) => strategy
- case None => restartStrategyFactory.createRestartStrategy()
+ case None => defaultRestartStrategy
}
log.info(s"Using restart strategy $restartStrategy for $jobId.")
@@ -1495,7 +1494,7 @@ class JobManager(
* @param cause Cause for the cancelling.
*/
private def cancelAndClearEverything(
- cause: UnrecoverableException,
+ cause: Throwable,
removeJobFromStateBackend: Boolean)
: Seq[Future[Unit]] = {
val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
@@ -2097,7 +2096,7 @@ object JobManager {
InstanceManager,
FlinkScheduler,
BlobLibraryCacheManager,
- RestartStrategyFactory,
+ RestartStrategy,
FiniteDuration, // timeout
Int, // number of archived jobs
LeaderElectionService,
@@ -2113,7 +2112,8 @@ object JobManager {
ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
- val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration)
+ val restartStrategy = RestartStrategyFactory
+ .createFromConfig(configuration)
val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index afc46a7..fe35c0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -192,7 +192,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
new Scheduler(TestingUtils.defaultExecutionContext()),
new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
ActorRef.noSender(),
- new NoRestartStrategy.NoRestartStrategyFactory(),
+ new NoRestartStrategy(),
AkkaUtils.getDefaultTimeout(),
leaderElectionService,
submittedJobGraphStore,
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
deleted file mode 100644
index b13ae81..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.leaderelection;
-
-import akka.actor.ActorRef;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.util.TestLogger;
-import org.junit.Before;
-import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertTrue;
-
-public class LeaderChangeJobRecoveryTest extends TestLogger {
-
- private static FiniteDuration timeout = FiniteDuration.apply(30, TimeUnit.SECONDS);
-
- private int numTMs = 1;
- private int numSlotsPerTM = 1;
- private int parallelism = numTMs * numSlotsPerTM;
-
- private Configuration configuration;
- private LeaderElectionRetrievalTestingCluster cluster = null;
- private JobGraph job = createBlockingJob(parallelism);
-
- @Before
- public void before() throws TimeoutException, InterruptedException {
- Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
-
- configuration = new Configuration();
-
- configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
- configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
-
- cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, new FixedDelayRestartStrategy(9999, 100));
- cluster.start(false);
-
- // wait for actors to be alive so that they have started their leader retrieval service
- cluster.waitForActorsToBeAlive();
- }
-
- /**
- * Tests that the job is not restarted or at least terminates eventually in case that the
- * JobManager loses its leadership.
- *
- * @throws Exception
- */
- @Test
- public void testNotRestartedWhenLosingLeadership() throws Exception {
- UUID leaderSessionID = UUID.randomUUID();
-
- cluster.grantLeadership(0, leaderSessionID);
- cluster.notifyRetrievalListeners(0, leaderSessionID);
-
- cluster.waitForTaskManagersToBeRegistered(timeout);
-
- cluster.submitJobDetached(job);
-
- ActorGateway jm = cluster.getLeaderGateway(timeout);
-
- Future<Object> wait = jm.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(job.getJobID()), timeout);
-
- Await.ready(wait, timeout);
-
- Future<Object> futureExecutionGraph = jm.ask(new TestingJobManagerMessages.RequestExecutionGraph(job.getJobID()), timeout);
-
- TestingJobManagerMessages.ResponseExecutionGraph responseExecutionGraph =
- (TestingJobManagerMessages.ResponseExecutionGraph) Await.result(futureExecutionGraph, timeout);
-
- assertTrue(responseExecutionGraph instanceof TestingJobManagerMessages.ExecutionGraphFound);
-
- ExecutionGraph executionGraph = ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
-
- TestActorGateway testActorGateway = new TestActorGateway();
-
- executionGraph.registerJobStatusListener(testActorGateway);
-
- cluster.revokeLeadership();
-
- Future<Boolean> hasReachedTerminalState = testActorGateway.hasReachedTerminalState();
-
- assertTrue("The job should have reached a terminal state.", Await.result(hasReachedTerminalState, timeout));
- }
-
- public JobGraph createBlockingJob(int parallelism) {
- Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
-
- JobVertex sender = new JobVertex("sender");
- JobVertex receiver = new JobVertex("receiver");
-
- sender.setInvokableClass(Tasks.Sender.class);
- receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class);
-
- sender.setParallelism(parallelism);
- receiver.setParallelism(parallelism);
-
- receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
-
- SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
- sender.setSlotSharingGroup(slotSharingGroup);
- receiver.setSlotSharingGroup(slotSharingGroup);
-
- return new JobGraph("Blocking test job", sender, receiver);
- }
-
- public static class TestActorGateway implements ActorGateway {
-
- private static final long serialVersionUID = -736146686160538227L;
- private transient Promise<Boolean> terminalState = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
- public Future<Boolean> hasReachedTerminalState() {
- return terminalState.future();
- }
-
- @Override
- public Future<Object> ask(Object message, FiniteDuration timeout) {
- return null;
- }
-
- @Override
- public void tell(Object message) {
- this.tell(message, new AkkaActorGateway(ActorRef.noSender(), null));
- }
-
- @Override
- public void tell(Object message, ActorGateway sender) {
- if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
- ExecutionGraphMessages.JobStatusChanged jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged) message;
-
- if (jobStatusChanged.newJobStatus().isTerminalState()) {
- terminalState.success(true);
- }
- }
- }
-
- @Override
- public void forward(Object message, ActorGateway sender) {
-
- }
-
- @Override
- public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
- return null;
- }
-
- @Override
- public String path() {
- return null;
- }
-
- @Override
- public ActorRef actor() {
- return null;
- }
-
- @Override
- public UUID leaderSessionID() {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index 7876ff7..c490a64 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -67,7 +67,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
- cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, null);
+ cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false);
cluster.start(false); // TaskManagers don't have to register at the JobManager
cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index cd89fa6..c8cf868 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import scala.Option;
@@ -39,7 +38,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
private final Configuration userConfiguration;
private final boolean useSingleActorSystem;
- private final RestartStrategy restartStrategy;
public List<TestingLeaderElectionService> leaderElectionServices;
public List<TestingLeaderRetrievalService> leaderRetrievalServices;
@@ -49,8 +47,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
public LeaderElectionRetrievalTestingCluster(
Configuration userConfiguration,
boolean singleActorSystem,
- boolean synchronousDispatcher,
- RestartStrategy restartStrategy) {
+ boolean synchronousDispatcher) {
super(userConfiguration, singleActorSystem, synchronousDispatcher);
this.userConfiguration = userConfiguration;
@@ -58,8 +55,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
leaderElectionServices = new ArrayList<TestingLeaderElectionService>();
leaderRetrievalServices = new ArrayList<TestingLeaderRetrievalService>();
-
- this.restartStrategy = restartStrategy;
}
@Override
@@ -95,15 +90,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER);
}
- @Override
- public RestartStrategy getRestartStrategy(RestartStrategy other) {
- if (this.restartStrategy != null) {
- return this.restartStrategy;
- } else {
- return other;
- }
- }
-
public void grantLeadership(int index, UUID leaderSessionID) {
if(leaderIndex >= 0) {
// first revoke leadership
@@ -123,11 +109,4 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
service.notifyListener(address, leaderSessionID);
}
}
-
- public void revokeLeadership() {
- if (leaderIndex >= 0) {
- leaderElectionServices.get(leaderIndex).notLeader();
- leaderIndex = -1;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 8f321bb..c72eb50 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -24,10 +24,10 @@ import akka.pattern.ask
import akka.actor.{ActorRef, Props, ActorSystem}
import akka.testkit.CallingThreadDispatcher
import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.minicluster.FlinkMiniCluster
+import org.apache.flink.util.NetUtils
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
@@ -96,7 +96,7 @@ class TestingCluster(
instanceManager,
scheduler,
libraryCacheManager,
- restartStrategyFactory,
+ restartStrategy,
timeout,
archiveCount,
leaderElectionService,
@@ -118,7 +118,7 @@ class TestingCluster(
scheduler,
libraryCacheManager,
archive,
- restartStrategyFactory,
+ restartStrategy,
timeout,
leaderElectionService,
submittedJobsGraphs,
@@ -155,10 +155,6 @@ class TestingCluster(
None
}
- def getRestartStrategy(restartStrategy: RestartStrategy) = {
- restartStrategy
- }
-
@throws(classOf[TimeoutException])
@throws(classOf[InterruptedException])
def waitForTaskManagersToBeAlive(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index e854b13..53867e0 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -23,7 +23,7 @@ import akka.actor.ActorRef
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
@@ -44,7 +44,7 @@ class TestingJobManager(
scheduler: Scheduler,
libraryCacheManager: BlobLibraryCacheManager,
archive: ActorRef,
- restartStrategyFactory: RestartStrategyFactory,
+ restartStrategy: RestartStrategy,
timeout: FiniteDuration,
leaderElectionService: LeaderElectionService,
submittedJobGraphs : SubmittedJobGraphStore,
@@ -58,7 +58,7 @@ class TestingJobManager(
scheduler,
libraryCacheManager,
archive,
- restartStrategyFactory,
+ restartStrategy,
timeout,
leaderElectionService,
submittedJobGraphs,
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 717d631..4e6b745 100644
--- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -24,7 +24,7 @@ import akka.actor.ActorRef
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
@@ -46,7 +46,7 @@ import scala.concurrent.duration.FiniteDuration
* @param scheduler Scheduler to schedule Flink jobs
* @param libraryCacheManager Manager to manage uploaded jar files
* @param archive Archive for finished Flink jobs
- * @param restartStrategyFactory Default restart strategy for job restarts
+ * @param restartStrategy Default restart strategy for job restarts
* @param timeout Timeout for futures
* @param leaderElectionService LeaderElectionService to participate in the leader election
*/
@@ -57,7 +57,7 @@ class TestingYarnJobManager(
scheduler: Scheduler,
libraryCacheManager: BlobLibraryCacheManager,
archive: ActorRef,
- restartStrategyFactory: RestartStrategyFactory,
+ restartStrategy: RestartStrategy,
timeout: FiniteDuration,
leaderElectionService: LeaderElectionService,
submittedJobGraphs : SubmittedJobGraphStore,
@@ -71,7 +71,7 @@ class TestingYarnJobManager(
scheduler,
libraryCacheManager,
archive,
- restartStrategyFactory,
+ restartStrategy,
timeout,
leaderElectionService,
submittedJobGraphs,
http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index a6d587b..314c5bd 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
import org.apache.flink.runtime.jobgraph.JobStatus
import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
import org.apache.flink.runtime.leaderelection.LeaderElectionService
@@ -75,7 +75,7 @@ import scala.util.Try
* @param scheduler Scheduler to schedule Flink jobs
* @param libraryCacheManager Manager to manage uploaded jar files
* @param archive Archive for finished Flink jobs
- * @param restartStrategyFactory Restart strategy to be used in case of a job recovery
+ * @param restartStrategy Restart strategy to be used in case of a job recovery
* @param timeout Timeout for futures
* @param leaderElectionService LeaderElectionService to participate in the leader election
*/
@@ -86,7 +86,7 @@ class YarnJobManager(
scheduler: FlinkScheduler,
libraryCacheManager: BlobLibraryCacheManager,
archive: ActorRef,
- restartStrategyFactory: RestartStrategyFactory,
+ restartStrategy: RestartStrategy,
timeout: FiniteDuration,
leaderElectionService: LeaderElectionService,
submittedJobGraphs : SubmittedJobGraphStore,
@@ -100,7 +100,7 @@ class YarnJobManager(
scheduler,
libraryCacheManager,
archive,
- restartStrategyFactory,
+ restartStrategy,
timeout,
leaderElectionService,
submittedJobGraphs,