You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/07/20 13:52:21 UTC

[3/3] flink git commit: [FLINK-7231] [distr. coordination] Fix slot release affecting SlotSharingGroup cleanup

[FLINK-7231] [distr. coordination] Fix slot release affecting SlotSharingGroup cleanup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39f5b114
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39f5b114
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39f5b114

Branch: refs/heads/release-1.3
Commit: 39f5b1144167dcb80e8708f4cb5426e76f648026
Parents: e6348fb
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 19 10:24:52 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 20 15:43:14 2017 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |   9 +-
 .../ExecutionGraphRestartTest.java              | 149 ++++++++++++++++---
 .../executiongraph/ExecutionGraphTestUtils.java |  19 ++-
 .../utils/NotCancelAckingTaskGateway.java       |  32 ++++
 4 files changed, 183 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a7d768b..f9d2d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -929,13 +929,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					}
 					catch (Throwable t) {
 						// we catch everything here to make sure cleanup happens and the
-						// ExecutionGraph notices
-						// we need to go into recovery and make sure to release all slots
+						// ExecutionGraph notices the error
+
+						// we need to to release all slots before going into recovery! 
 						try {
-							failGlobal(t);
+							ExecutionGraphUtils.releaseAllSlotsSilently(resources);
 						}
 						finally {
-							ExecutionGraphUtils.releaseAllSlotsSilently(resources);
+							failGlobal(t);
 						}
 					}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 3ce6baa..bd8b4ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,13 +21,12 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
@@ -35,24 +34,33 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
 import org.junit.Test;
 
 import scala.concurrent.Await;
@@ -62,8 +70,12 @@ import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
@@ -85,6 +97,15 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	private final static int NUM_TASKS = 31;
 
+	private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
+
+	@After
+	public void shutdown() {
+		executor.shutdownNow();
+	}
+
+	// ------------------------------------------------------------------------
+
 	@Test
 	public void testNoManualRestart() throws Exception {
 		NoRestartStrategy restartStrategy = new NoRestartStrategy();
@@ -661,10 +682,117 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testRestartWithEagerSchedulingAndSlotSharing() throws Exception {
+		// this test is inconclusive if not used with a proper multi-threaded executor
+		assertTrue("test assumptions violated", ((ThreadPoolExecutor) executor).getCorePoolSize() > 1);
+
+		final int parallelism = 20;
+		final Scheduler scheduler = createSchedulerWithInstances(parallelism);
+
+		final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+
+		final JobVertex source = new JobVertex("source");
+		source.setInvokableClass(NoOpInvokable.class);
+		source.setParallelism(parallelism);
+		source.setSlotSharingGroup(sharingGroup);
+
+		final JobVertex sink = new JobVertex("sink");
+		sink.setInvokableClass(NoOpInvokable.class);
+		sink.setParallelism(parallelism);
+		sink.setSlotSharingGroup(sharingGroup);
+		sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+
+		final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
+				new JobID(), scheduler, new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0), executor, source, sink);
+
+		eg.setScheduleMode(ScheduleMode.EAGER);
+		eg.scheduleForExecution();
+
+		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+		// fail into 'RESTARTING'
+		eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
+				new Exception("intended test failure"));
+
+		assertEquals(JobStatus.FAILING, eg.getState());
+		completeCancellingForAllVertices(eg);
+
+		// clean termination
+		waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
+		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+		finishAllVertices(eg);
+		waitUntilJobStatus(eg, JobStatus.FINISHED, 1000);
+	}
+
+	@Test
+	public void testRestartWithSlotSharingAndNotEnoughResources() throws Exception {
+		// this test is inconclusive if not used with a proper multi-threaded executor
+		assertTrue("test assumptions violated", ((ThreadPoolExecutor) executor).getCorePoolSize() > 1);
+
+		final int numRestarts = 10;
+		final int parallelism = 20;
+
+		final Scheduler scheduler = createSchedulerWithInstances(parallelism - 1);
+
+		final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+
+		final JobVertex source = new JobVertex("source");
+		source.setInvokableClass(NoOpInvokable.class);
+		source.setParallelism(parallelism);
+		source.setSlotSharingGroup(sharingGroup);
+
+		final JobVertex sink = new JobVertex("sink");
+		sink.setInvokableClass(NoOpInvokable.class);
+		sink.setParallelism(parallelism);
+		sink.setSlotSharingGroup(sharingGroup);
+		sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+
+		final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
+				new JobID(), scheduler, new FixedDelayRestartStrategy(numRestarts, 0), executor, source, sink);
+
+		eg.setScheduleMode(ScheduleMode.EAGER);
+		eg.scheduleForExecution();
+
+		// wait until no more changes happen
+		while (eg.getNumberOfFullRestarts() < numRestarts) {
+			Thread.sleep(1);
+		}
+
+		waitUntilJobStatus(eg, JobStatus.FAILED, 1000);
+
+		final Throwable t = eg.getFailureCause();
+		if (!(t instanceof NoResourceAvailableException)) {
+			ExceptionUtils.rethrowException(t, t.getMessage());
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 
+	private Scheduler createSchedulerWithInstances(int num) {
+		final Scheduler scheduler = new Scheduler(executor);
+		final Instance[] instances = new Instance[num];
+
+		for (int i = 0; i < instances.length; i++) {
+			instances[i] = createInstance(55443 + i);
+			scheduler.newInstanceAvailable(instances[i]);
+		}
+
+		return scheduler;
+	}
+
+	private static Instance createInstance(int port) {
+		final HardwareDescription resources = new HardwareDescription(4, 1_000_000_000, 500_000_000, 400_000_000);
+		final TaskManagerGateway taskManager = new SimpleAckingTaskManagerGateway();
+		final TaskManagerLocation location = new TaskManagerLocation(
+				ResourceID.generate(), InetAddress.getLoopbackAddress(), port);
+		return new Instance(taskManager, location, new InstanceID(), resources, 1);
+	}
+
+	// ------------------------------------------------------------------------
+
 	private static class ControllableRestartStrategy implements RestartStrategy {
 
 		private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise<>();
@@ -773,10 +901,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			scheduler);
 	}
 
-	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
-		return newExecutionGraph(restartStrategy, new Scheduler(TestingUtils.defaultExecutionContext()));
-	}
-
 	private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {
 		makeAFailureAndWait(eg, timeout);
 
@@ -841,17 +965,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * A TaskManager gateway that does not ack cancellations.
-	 */
-	private static final class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
-
-		@Override
-		public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
-			return new FlinkCompletableFuture<>();
-		}
-	}
-
-	/**
 	 * A RestartStrategy that blocks restarting on a given {@link OneShotLatch}.
 	 */
 	private static final class TriggeredRestartStrategy implements RestartStrategy {

http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 83c8fb0..dbbd3e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -321,6 +322,16 @@ public class ExecutionGraphTestUtils {
 			RestartStrategy restartStrategy,
 			JobVertex... vertices) throws Exception {
 
+		return createExecutionGraph(jid, slotProvider, restartStrategy, TestingUtils.defaultExecutor(), vertices);
+	}
+
+	public static ExecutionGraph createExecutionGraph(
+			JobID jid,
+			SlotProvider slotProvider,
+			RestartStrategy restartStrategy,
+			ScheduledExecutorService executor,
+			JobVertex... vertices) throws Exception {
+
 		checkNotNull(jid);
 		checkNotNull(restartStrategy);
 		checkNotNull(vertices);
@@ -329,18 +340,18 @@ public class ExecutionGraphTestUtils {
 				null,
 				new JobGraph(jid, "test job", vertices),
 				new Configuration(),
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
+				executor,
+				executor,
 				slotProvider,
 				ExecutionGraphTestUtils.class.getClassLoader(),
-				mock(CheckpointRecoveryFactory.class),
+				new StandaloneCheckpointRecoveryFactory(),
 				Time.seconds(10),
 				restartStrategy,
 				new UnregisteredMetricsGroup(),
 				1,
 				TEST_LOGGER);
 	}
-	
+
 	public static JobVertex createNoOpVertex(int parallelism) {
 		JobVertex vertex = new JobVertex("vertex");
 		vertex.setInvokableClass(NoOpInvokable.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/39f5b114/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
new file mode 100644
index 0000000..f453d20
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.utils;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+public class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
+
+	@Override
+	public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return new FlinkCompletableFuture<>();
+	}
+}
\ No newline at end of file