You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/07/20 13:52:21 UTC
[3/3] flink git commit: [FLINK-7231] [distr. coordination] Fix slot
release affecting SlotSharingGroup cleanup
[FLINK-7231] [distr. coordination] Fix slot release affecting SlotSharingGroup cleanup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39f5b114
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39f5b114
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39f5b114
Branch: refs/heads/release-1.3
Commit: 39f5b1144167dcb80e8708f4cb5426e76f648026
Parents: e6348fb
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 19 10:24:52 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 20 15:43:14 2017 +0200
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 9 +-
.../ExecutionGraphRestartTest.java | 149 ++++++++++++++++---
.../executiongraph/ExecutionGraphTestUtils.java | 19 ++-
.../utils/NotCancelAckingTaskGateway.java | 32 ++++
4 files changed, 183 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a7d768b..f9d2d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -929,13 +929,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
catch (Throwable t) {
// we catch everything here to make sure cleanup happens and the
- // ExecutionGraph notices
- // we need to go into recovery and make sure to release all slots
+ // ExecutionGraph notices the error
+
+ // we need to to release all slots before going into recovery!
try {
- failGlobal(t);
+ ExecutionGraphUtils.releaseAllSlotsSilently(resources);
}
finally {
- ExecutionGraphUtils.releaseAllSlotsSilently(resources);
+ failGlobal(t);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 3ce6baa..bd8b4ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,13 +21,12 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
@@ -35,24 +34,33 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
+import org.junit.After;
import org.junit.Test;
import scala.concurrent.Await;
@@ -62,8 +70,12 @@ import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
@@ -85,6 +97,15 @@ public class ExecutionGraphRestartTest extends TestLogger {
private final static int NUM_TASKS = 31;
+ private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
+
+ @After
+ public void shutdown() {
+ executor.shutdownNow();
+ }
+
+ // ------------------------------------------------------------------------
+
@Test
public void testNoManualRestart() throws Exception {
NoRestartStrategy restartStrategy = new NoRestartStrategy();
@@ -661,10 +682,117 @@ public class ExecutionGraphRestartTest extends TestLogger {
}
}
+ @Test
+ public void testRestartWithEagerSchedulingAndSlotSharing() throws Exception {
+ // this test is inconclusive if not used with a proper multi-threaded executor
+ assertTrue("test assumptions violated", ((ThreadPoolExecutor) executor).getCorePoolSize() > 1);
+
+ final int parallelism = 20;
+ final Scheduler scheduler = createSchedulerWithInstances(parallelism);
+
+ final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+
+ final JobVertex source = new JobVertex("source");
+ source.setInvokableClass(NoOpInvokable.class);
+ source.setParallelism(parallelism);
+ source.setSlotSharingGroup(sharingGroup);
+
+ final JobVertex sink = new JobVertex("sink");
+ sink.setInvokableClass(NoOpInvokable.class);
+ sink.setParallelism(parallelism);
+ sink.setSlotSharingGroup(sharingGroup);
+ sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+
+ final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
+ new JobID(), scheduler, new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0), executor, source, sink);
+
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.scheduleForExecution();
+
+ waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+ // fail into 'RESTARTING'
+ eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
+ new Exception("intended test failure"));
+
+ assertEquals(JobStatus.FAILING, eg.getState());
+ completeCancellingForAllVertices(eg);
+
+ // clean termination
+ waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
+ waitUntilDeployedAndSwitchToRunning(eg, 1000);
+ finishAllVertices(eg);
+ waitUntilJobStatus(eg, JobStatus.FINISHED, 1000);
+ }
+
+ @Test
+ public void testRestartWithSlotSharingAndNotEnoughResources() throws Exception {
+ // this test is inconclusive if not used with a proper multi-threaded executor
+ assertTrue("test assumptions violated", ((ThreadPoolExecutor) executor).getCorePoolSize() > 1);
+
+ final int numRestarts = 10;
+ final int parallelism = 20;
+
+ final Scheduler scheduler = createSchedulerWithInstances(parallelism - 1);
+
+ final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+
+ final JobVertex source = new JobVertex("source");
+ source.setInvokableClass(NoOpInvokable.class);
+ source.setParallelism(parallelism);
+ source.setSlotSharingGroup(sharingGroup);
+
+ final JobVertex sink = new JobVertex("sink");
+ sink.setInvokableClass(NoOpInvokable.class);
+ sink.setParallelism(parallelism);
+ sink.setSlotSharingGroup(sharingGroup);
+ sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+
+ final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
+ new JobID(), scheduler, new FixedDelayRestartStrategy(numRestarts, 0), executor, source, sink);
+
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.scheduleForExecution();
+
+ // wait until no more changes happen
+ while (eg.getNumberOfFullRestarts() < numRestarts) {
+ Thread.sleep(1);
+ }
+
+ waitUntilJobStatus(eg, JobStatus.FAILED, 1000);
+
+ final Throwable t = eg.getFailureCause();
+ if (!(t instanceof NoResourceAvailableException)) {
+ ExceptionUtils.rethrowException(t, t.getMessage());
+ }
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
+ private Scheduler createSchedulerWithInstances(int num) {
+ final Scheduler scheduler = new Scheduler(executor);
+ final Instance[] instances = new Instance[num];
+
+ for (int i = 0; i < instances.length; i++) {
+ instances[i] = createInstance(55443 + i);
+ scheduler.newInstanceAvailable(instances[i]);
+ }
+
+ return scheduler;
+ }
+
+ private static Instance createInstance(int port) {
+ final HardwareDescription resources = new HardwareDescription(4, 1_000_000_000, 500_000_000, 400_000_000);
+ final TaskManagerGateway taskManager = new SimpleAckingTaskManagerGateway();
+ final TaskManagerLocation location = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), port);
+ return new Instance(taskManager, location, new InstanceID(), resources, 1);
+ }
+
+ // ------------------------------------------------------------------------
+
private static class ControllableRestartStrategy implements RestartStrategy {
private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise<>();
@@ -773,10 +901,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
scheduler);
}
- private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
- return newExecutionGraph(restartStrategy, new Scheduler(TestingUtils.defaultExecutionContext()));
- }
-
private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {
makeAFailureAndWait(eg, timeout);
@@ -841,17 +965,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
// ------------------------------------------------------------------------
/**
- * A TaskManager gateway that does not ack cancellations.
- */
- private static final class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
-
- @Override
- public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
- return new FlinkCompletableFuture<>();
- }
- }
-
- /**
* A RestartStrategy that blocks restarting on a given {@link OneShotLatch}.
*/
private static final class TriggeredRestartStrategy implements RestartStrategy {
http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 83c8fb0..dbbd3e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -321,6 +322,16 @@ public class ExecutionGraphTestUtils {
RestartStrategy restartStrategy,
JobVertex... vertices) throws Exception {
+ return createExecutionGraph(jid, slotProvider, restartStrategy, TestingUtils.defaultExecutor(), vertices);
+ }
+
+ public static ExecutionGraph createExecutionGraph(
+ JobID jid,
+ SlotProvider slotProvider,
+ RestartStrategy restartStrategy,
+ ScheduledExecutorService executor,
+ JobVertex... vertices) throws Exception {
+
checkNotNull(jid);
checkNotNull(restartStrategy);
checkNotNull(vertices);
@@ -329,18 +340,18 @@ public class ExecutionGraphTestUtils {
null,
new JobGraph(jid, "test job", vertices),
new Configuration(),
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
+ executor,
+ executor,
slotProvider,
ExecutionGraphTestUtils.class.getClassLoader(),
- mock(CheckpointRecoveryFactory.class),
+ new StandaloneCheckpointRecoveryFactory(),
Time.seconds(10),
restartStrategy,
new UnregisteredMetricsGroup(),
1,
TEST_LOGGER);
}
-
+
public static JobVertex createNoOpVertex(int parallelism) {
JobVertex vertex = new JobVertex("vertex");
vertex.setInvokableClass(NoOpInvokable.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
new file mode 100644
index 0000000..f453d20
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.utils;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+public class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
+
+ @Override
+ public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ return new FlinkCompletableFuture<>();
+ }
+}
\ No newline at end of file