You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/21 14:52:47 UTC

[1/3] flink git commit: [hotfix] Harden SlotManagerTest#testTaskManagerTimeoutDoesNotRemoveSlots

Repository: flink
Updated Branches:
  refs/heads/master 6c5ecc2f4 -> e4849464f


[hotfix] Harden SlotManagerTest#testTaskManagerTimeoutDoesNotRemoveSlots


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4849464
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4849464
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4849464

Branch: refs/heads/master
Commit: e4849464f070e987f7fff5ded79ec47f65dd6353
Parents: bee8c99
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Oct 20 16:25:00 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Oct 21 16:52:36 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4849464/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 726f224..c4c320a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -1057,7 +1057,7 @@ public class SlotManagerTest extends TestLogger {
 			assertEquals(1, slotManager.getNumberRegisteredSlots());
 
 			// wait for the timeout call to happen
-			verify(resourceActions, timeout(taskManagerTimeout.toMilliseconds() * 3L)).releaseResource(eq(taskExecutorConnection.getInstanceID()));
+			verify(resourceActions, timeout(taskManagerTimeout.toMilliseconds() * 3L).atLeast(1)).releaseResource(eq(taskExecutorConnection.getInstanceID()));
 
 			assertEquals(1, slotManager.getNumberRegisteredSlots());
 


[3/3] flink git commit: [FLINK-7793] [flip6] Defer slot release to ResourceManager

Posted by tr...@apache.org.
[FLINK-7793] [flip6] Defer slot release to ResourceManager

This commit changes the SlotManager behaviour such that upon a TaskManager timeout
the ResourceManager is only notified about it without removing the slots. The
ResourceManager can then decide whether it stops the TaskManager and removes the slots
from the SlotManager or to keep the TaskManager still around.

Add test case

This closes #4795.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0b98969
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0b98969
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0b98969

Branch: refs/heads/master
Commit: e0b98969ee4cd16b6667a416c0c2ffb60132fb12
Parents: 6c5ecc2
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Oct 10 18:39:40 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Oct 21 16:52:36 2017 +0200

----------------------------------------------------------------------
 .../clusterframework/MesosResourceManager.java  |  5 +-
 .../MesosResourceManagerTest.java               | 38 +++++++--
 .../resourcemanager/ResourceManager.java        | 77 +++++++++---------
 .../StandaloneResourceManager.java              |  5 +-
 .../slotmanager/ResourceActions.java            | 55 +++++++++++++
 .../slotmanager/ResourceManagerActions.java     | 34 --------
 .../slotmanager/SlotManager.java                | 43 +++++-----
 .../slotmanager/SlotManagerTest.java            | 83 +++++++++++++++-----
 .../slotmanager/SlotProtocolTest.java           |  4 +-
 .../apache/flink/yarn/YarnResourceManager.java  |  3 +-
 10 files changed, 225 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0b98969/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 8a8f208..1e32b2c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -421,8 +421,9 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	}
 
 	@Override
-	public void stopWorker(ResourceID resourceID) {
+	public boolean stopWorker(ResourceID resourceID) {
 		LOG.info("Stopping worker {}.", resourceID);
+
 		try {
 
 			if (workersInLaunch.containsKey(resourceID)) {
@@ -449,6 +450,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		catch (Exception e) {
 			onFatalError(new ResourceManagerException("Unable to release a worker.", e));
 		}
+
+		return true;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e0b98969/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index dbd0746..4bdd9a3 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -57,7 +58,7 @@ import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -92,6 +93,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -199,6 +201,12 @@ public class MesosResourceManagerTest extends TestLogger {
 			super.closeTaskManagerConnection(resourceID, cause);
 			closedTaskManagerConnections.add(resourceID);
 		}
+
+		@VisibleForTesting
+		@Override
+		public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time timeout) {
+			return super.callAsync(callable, timeout);
+		}
 	}
 
 	/**
@@ -303,7 +311,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			public final JobLeaderIdService jobLeaderIdService;
 			public final SlotManager slotManager;
 			public final CompletableFuture<Boolean> slotManagerStarted;
-			public ResourceManagerActions rmActions;
+			public ResourceActions rmActions;
 
 			public UUID rmLeaderSessionId;
 
@@ -324,11 +332,11 @@ public class MesosResourceManagerTest extends TestLogger {
 				doAnswer(new Answer<Object>() {
 					@Override
 					public Object answer(InvocationOnMock invocation) throws Throwable {
-						rmActions = invocation.getArgumentAt(2, ResourceManagerActions.class);
+						rmActions = invocation.getArgumentAt(2, ResourceActions.class);
 						slotManagerStarted.complete(true);
 						return null;
 					}
-				}).when(slotManager).start(any(ResourceManagerId.class), any(Executor.class), any(ResourceManagerActions.class));
+				}).when(slotManager).start(any(ResourceManagerId.class), any(Executor.class), any(ResourceActions.class));
 
 				when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenReturn(true);
 			}
@@ -453,9 +461,18 @@ public class MesosResourceManagerTest extends TestLogger {
 		public MesosWorkerStore.Worker allocateWorker(Protos.TaskID taskID, ResourceProfile resourceProfile) throws Exception {
 			when(rmServices.workerStore.newTaskID()).thenReturn(taskID);
 			rmServices.slotManagerStarted.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-			rmServices.rmActions.allocateResource(resourceProfile);
+
+			CompletableFuture<Void> allocateResourceFuture = resourceManager.callAsync(
+				() -> {
+					rmServices.rmActions.allocateResource(resourceProfile);
+					return null;
+				},
+				timeout);
 			MesosWorkerStore.Worker expected = MesosWorkerStore.Worker.newWorker(taskID, resourceProfile);
 
+			// check for exceptions
+			allocateResourceFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
 			// drain the probe messages
 			verify(rmServices.workerStore, Mockito.timeout(timeout.toMilliseconds())).putWorker(expected);
 			assertThat(resourceManager.workersInNew, hasEntry(extractResourceID(taskID), expected));
@@ -529,7 +546,16 @@ public class MesosResourceManagerTest extends TestLogger {
 			// allocate a worker
 			when(rmServices.workerStore.newTaskID()).thenReturn(task1).thenThrow(new AssertionFailedError());
 			rmServices.slotManagerStarted.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-			rmServices.rmActions.allocateResource(resourceProfile1);
+
+			CompletableFuture<Void> allocateResourceFuture = resourceManager.callAsync(
+				() -> {
+					rmServices.rmActions.allocateResource(resourceProfile1);
+					return null;
+				},
+				timeout);
+
+			// check for exceptions
+			allocateResourceFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 			// verify that a new worker was persisted, the internal state was updated, the task router was notified,
 			// and the launch coordinator was asked to launch a task

http://git-wip-us.apache.org/repos/asf/flink/blob/e0b98969/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index d636ba4..38cfd6e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -46,7 +46,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
 import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
-import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -56,6 +56,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -672,7 +673,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
 
 		if (workerRegistration != null) {
-			log.info("Task manager {} failed because {}.", resourceID, cause);
+			log.info("Task manager {} failed because {}.", resourceID, cause.getMessage());
 
 			// TODO :: suggest failed task executor to stop itself
 			slotManager.unregisterTaskManager(workerRegistration.getInstanceID());
@@ -709,6 +710,29 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		}
 	}
 
+	protected void releaseResource(InstanceID instanceId) {
+		ResourceID resourceID = null;
+
+		// TODO: Improve performance by having an index on the instanceId
+		for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry : taskExecutors.entrySet()) {
+			if (entry.getValue().getInstanceID().equals(instanceId)) {
+				resourceID = entry.getKey();
+				break;
+			}
+		}
+
+		if (resourceID != null) {
+			if (stopWorker(resourceID)) {
+				closeTaskManagerConnection(resourceID, new FlinkException("Worker was stopped."));
+			} else {
+				log.debug("Worker {} was not stopped.", resourceID);
+			}
+		} else {
+			// unregister in order to clean up potential left over state
+			slotManager.unregisterTaskManager(instanceId);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Info messaging
 	// ------------------------------------------------------------------------
@@ -768,7 +792,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 				setFencingToken(newResourceManagerId);
 
-				slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceManagerActionsImpl());
+				slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
 
 				getRpcService().execute(
 					() ->
@@ -837,60 +861,41 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	public abstract void startNewWorker(ResourceProfile resourceProfile);
 
 	/**
-	 * Deallocates a resource.
-	 *
-	 * @param resourceID The resource ID
-	 */
-	public abstract void stopWorker(ResourceID resourceID);
-
-	/**
 	 * Callback when a worker was started.
 	 * @param resourceID The worker resource id
 	 */
 	protected abstract WorkerType workerStarted(ResourceID resourceID);
 
+	/**
+	 * Stops the given worker.
+	 *
+	 * @param resourceID identifying the worker to be stopped
+	 * @return True if the worker was stopped, otherwise false
+	 */
+	public abstract boolean stopWorker(ResourceID resourceID);
+
 	// ------------------------------------------------------------------------
 	//  Static utility classes
 	// ------------------------------------------------------------------------
 
-	private class ResourceManagerActionsImpl implements ResourceManagerActions {
+	private class ResourceActionsImpl implements ResourceActions {
 
 		@Override
 		public void releaseResource(InstanceID instanceId) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					ResourceID resourceID = null;
+			validateRunsInMainThread();
 
-					for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry : taskExecutors.entrySet()) {
-						if (entry.getValue().getInstanceID().equals(instanceId)) {
-							resourceID = entry.getKey();
-							break;
-						}
-					}
-
-					if (resourceID != null) {
-						stopWorker(resourceID);
-					}
-					else {
-						log.warn("Ignoring request to release TaskManager with instance ID {} (not found).", instanceId);
-					}
-				}
-			});
+			ResourceManager.this.releaseResource(instanceId);
 		}
 
 		@Override
 		public void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					startNewWorker(resourceProfile);
-				}
-			});
+			validateRunsInMainThread();
+			startNewWorker(resourceProfile);
 		}
 
 		@Override
 		public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
+			validateRunsInMainThread();
 			log.info("Slot request with allocation id {} for job {} failed.", allocationId, jobId, cause);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0b98969/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index ac2c745..624f31d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -75,8 +75,9 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 	}
 
 	@Override
-	public void stopWorker(ResourceID resourceID) {
-
+	public boolean stopWorker(ResourceID resourceID) {
+		// standalone resource manager cannot stop workers
+		return false;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e0b98969/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
new file mode 100644
index 0000000..753e5e2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+/**
+ * Resource related actions which the {@link SlotManager} can perform.
+ */
+public interface ResourceActions {
+
+	/**
+	 * Releases the resource with the given instance id.
+	 *
+	 * @param instanceId identifying which resource to release
+	 */
+	void releaseResource(InstanceID instanceId);
+
+	/**
+	 * Requests to allocate a resource with the given {@link ResourceProfile}.
+	 *
+	 * @param resourceProfile for the to be allocated resource
+	 * @throws ResourceManagerException if the resource cannot be allocated
+	 */
+	void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException;
+
+	/**
+	 * Notifies that an allocation failure has occurred.
+	 *
+	 * @param jobId to which the allocation belonged
+	 * @param allocationId identifying the failed allocation
+	 * @param cause of the allocation failure
+	 */
+	void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0b98969/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceManagerActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceManagerActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceManagerActions.java
deleted file mode 100644
index c8b288e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceManagerActions.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
-
-public interface ResourceManagerActions {
-
-	void releaseResource(InstanceID instanceId);
-
-	void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException;
-
-	void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e0b98969/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 7da1a9e..ba52f02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -61,7 +61,7 @@ import java.util.concurrent.TimeoutException;
  * their allocation and all pending slot requests. Whenever a new slot is registered or and
  * allocated slot is freed, then it tries to fulfill another pending slot request. Whenever there
  * are not enough slots available the slot manager will notify the resource manager about it via
- * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
+ * {@link ResourceActions#allocateResource(ResourceProfile)}.
  *
  * In order to free resources and avoid resource leaks, idling task managers (task managers whose
  * slots are currently not used) and pending slot requests time out triggering their release and
@@ -104,7 +104,7 @@ public class SlotManager implements AutoCloseable {
 	private Executor mainThreadExecutor;
 
 	/** Callbacks for resource (de-)allocations */
-	private ResourceManagerActions resourceManagerActions;
+	private ResourceActions resourceActions;
 
 	private ScheduledFuture<?> taskManagerTimeoutCheck;
 
@@ -130,7 +130,7 @@ public class SlotManager implements AutoCloseable {
 		pendingSlotRequests = new HashMap<>(16);
 
 		resourceManagerId = null;
-		resourceManagerActions = null;
+		resourceActions = null;
 		mainThreadExecutor = null;
 		taskManagerTimeoutCheck = null;
 		slotRequestTimeoutCheck = null;
@@ -175,14 +175,14 @@ public class SlotManager implements AutoCloseable {
 	 *
 	 * @param newResourceManagerId to use for communication with the task managers
 	 * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread
-	 * @param newResourceManagerActions to use for resource (de-)allocations
+	 * @param newResourceActions to use for resource (de-)allocations
 	 */
-	public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) {
+	public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
 		LOG.info("Starting the SlotManager.");
 
 		this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
 		mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
-		resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions);
+		resourceActions = Preconditions.checkNotNull(newResourceActions);
 
 		started = true;
 
@@ -237,7 +237,7 @@ public class SlotManager implements AutoCloseable {
 		}
 
 		resourceManagerId = null;
-		resourceManagerActions = null;
+		resourceActions = null;
 		started = false;
 	}
 
@@ -363,6 +363,8 @@ public class SlotManager implements AutoCloseable {
 	public boolean unregisterTaskManager(InstanceID instanceId) {
 		checkInit();
 
+		LOG.info("Unregister TaskManager {} from the SlotManager.", instanceId);
+
 		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
 
 		if (null != taskManagerRegistration) {
@@ -634,7 +636,7 @@ public class SlotManager implements AutoCloseable {
 		if (taskManagerSlot != null) {
 			allocateSlot(taskManagerSlot, pendingSlotRequest);
 		} else {
-			resourceManagerActions.allocateResource(pendingSlotRequest.getResourceProfile());
+			resourceActions.allocateResource(pendingSlotRequest.getResourceProfile());
 		}
 	}
 
@@ -822,7 +824,7 @@ public class SlotManager implements AutoCloseable {
 			} catch (ResourceManagerException e) {
 				pendingSlotRequests.remove(allocationId);
 
-				resourceManagerActions.notifyAllocationFailure(
+				resourceActions.notifyAllocationFailure(
 					pendingSlotRequest.getJobId(),
 					allocationId,
 					e);
@@ -870,22 +872,23 @@ public class SlotManager implements AutoCloseable {
 		if (!taskManagerRegistrations.isEmpty()) {
 			long currentTime = System.currentTimeMillis();
 
-			Iterator<Map.Entry<InstanceID, TaskManagerRegistration>> taskManagerRegistrationIterator = taskManagerRegistrations.entrySet().iterator();
+			ArrayList<InstanceID> timedOutTaskManagerIds = new ArrayList<>(taskManagerRegistrations.size());
 
-			while (taskManagerRegistrationIterator.hasNext()) {
-				TaskManagerRegistration taskManagerRegistration = taskManagerRegistrationIterator.next().getValue();
+			// first retrieve the timed out TaskManagers
+			for (TaskManagerRegistration taskManagerRegistration : taskManagerRegistrations.values()) {
 				LOG.debug("Evaluating TaskManager {} for idleness.", taskManagerRegistration.getInstanceId());
 
 				if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) {
-					LOG.info("Removing idle TaskManager {} from the SlotManager.", taskManagerRegistration.getInstanceId());
-
-					taskManagerRegistrationIterator.remove();
-
-					internalUnregisterTaskManager(taskManagerRegistration);
-
-					resourceManagerActions.releaseResource(taskManagerRegistration.getInstanceId());
+					// we collect the instance ids first in order to avoid concurrent modifications by the
+					// ResourceActions.releaseResource call
+					timedOutTaskManagerIds.add(taskManagerRegistration.getInstanceId());
 				}
 			}
+
+			// second we trigger the release resource callback which can decide upon the resource release
+			for (InstanceID timedOutTaskManagerId : timedOutTaskManagerIds) {
+				resourceActions.releaseResource(timedOutTaskManagerId);
+			}
 		}
 	}
 
@@ -905,7 +908,7 @@ public class SlotManager implements AutoCloseable {
 						cancelPendingSlotRequest(slotRequest);
 					}
 
-					resourceManagerActions.notifyAllocationFailure(
+					resourceActions.notifyAllocationFailure(
 						slotRequest.getJobId(),
 						slotRequest.getAllocationId(),
 						new TimeoutException("The allocation could not be fulfilled in time."));

http://git-wip-us.apache.org/repos/asf/flink/blob/e0b98969/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index d642933..726f224 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -76,7 +76,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testTaskManagerRegistration() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -105,7 +105,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testTaskManagerUnregistration() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		final JobID jobId = new JobID();
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
@@ -175,7 +175,7 @@ public class SlotManagerTest extends TestLogger {
 			resourceProfile,
 			"localhost");
 
-		ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		ResourceActions resourceManagerActions = mock(ResourceActions.class);
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 
@@ -198,7 +198,7 @@ public class SlotManagerTest extends TestLogger {
 			resourceProfile,
 			"localhost");
 
-		ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		doThrow(new ResourceManagerException("Test exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class));
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
@@ -230,7 +230,7 @@ public class SlotManagerTest extends TestLogger {
 			resourceProfile,
 			targetAddress);
 
-		ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		ResourceActions resourceManagerActions = mock(ResourceActions.class);
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 
@@ -270,7 +270,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testUnregisterPendingSlotRequest() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
 		final AllocationID allocationId = new AllocationID();
 
@@ -329,7 +329,7 @@ public class SlotManagerTest extends TestLogger {
 			resourceProfile,
 			targetAddress);
 
-		ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		ResourceActions resourceManagerActions = mock(ResourceActions.class);
 
 		// accept an incoming slot request
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
@@ -376,7 +376,7 @@ public class SlotManagerTest extends TestLogger {
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
 
-		ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		ResourceActions resourceManagerActions = mock(ResourceActions.class);
 
 		// accept an incoming slot request
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
@@ -416,7 +416,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testDuplicatePendingSlotRequest() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
 		final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
@@ -440,7 +440,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testDuplicatePendingSlotRequestAfterSlotReport() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
@@ -468,7 +468,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
 		final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
@@ -513,7 +513,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
 		final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
@@ -566,7 +566,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testReceivingUnknownSlotReport() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 
 		final InstanceID unknownInstanceID = new InstanceID();
 		final SlotID unknownSlotId = new SlotID(ResourceID.generate(), 0);
@@ -592,7 +592,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testUpdateSlotReport() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
@@ -649,7 +649,7 @@ public class SlotManagerTest extends TestLogger {
 	public void testTaskManagerTimeout() throws Exception {
 		final long tmTimeout = 500L;
 
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
@@ -691,7 +691,7 @@ public class SlotManagerTest extends TestLogger {
 	public void testSlotRequestTimeout() throws Exception {
 		final long allocationTimeout = 50L;
 
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
@@ -740,7 +740,7 @@ public class SlotManagerTest extends TestLogger {
 	@SuppressWarnings("unchecked")
 	public void testTaskManagerSlotRequestTimeoutHandling() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
@@ -819,7 +819,7 @@ public class SlotManagerTest extends TestLogger {
 	public void testSlotReportWhileActiveSlotRequest() throws Exception {
 		final long verifyTimeout = 10000L;
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
@@ -937,7 +937,7 @@ public class SlotManagerTest extends TestLogger {
 		final long verifyTimeout = taskManagerTimeout * 10L;
 
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor();
 
 		final ResourceID resourceId = ResourceID.generate();
@@ -1024,7 +1024,50 @@ public class SlotManagerTest extends TestLogger {
 		}
 	}
 
-	private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceManagerActions resourceManagerActions) {
+	/**
+	 * Tests that a task manager timeout does not remove the slots from the SlotManager.
+	 * A timeout should only trigger the {@link ResourceActions#releaseResource(InstanceID)}
+	 * callback. The receiver of the callback can then decide what to do with the TaskManager.
+	 *
+	 * FLINK-7793
+	 */
+	@Test
+	public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
+		final Time taskManagerTimeout = Time.milliseconds(10L);
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+		final ResourceActions resourceActions = mock(ResourceActions.class);
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final SlotStatus slotStatus = new SlotStatus(
+			new SlotID(ResourceID.generate(), 0),
+			new ResourceProfile(1.0, 1));
+		final SlotReport initialSlotReport = new SlotReport(slotStatus);
+
+		try (final SlotManager slotManager = new SlotManager(
+			TestingUtils.defaultScheduledExecutor(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			taskManagerTimeout)) {
+
+			slotManager.start(resourceManagerId, Executors.directExecutor(), resourceActions);
+
+			slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport);
+
+			assertEquals(1, slotManager.getNumberRegisteredSlots());
+
+			// wait for the timeout call to happen
+			verify(resourceActions, timeout(taskManagerTimeout.toMilliseconds() * 3L)).releaseResource(eq(taskExecutorConnection.getInstanceID()));
+
+			assertEquals(1, slotManager.getNumberRegisteredSlots());
+
+			slotManager.unregisterTaskManager(taskExecutorConnection.getInstanceID());
+
+			assertEquals(0, slotManager.getNumberRegisteredSlots());
+		}
+	}
+
+	private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
 		SlotManager slotManager = new SlotManager(
 			TestingUtils.defaultScheduledExecutor(),
 			TestingUtils.infiniteTime(),

http://git-wip-us.apache.org/repos/asf/flink/blob/e0b98969/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 6de4d52..97942ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -85,7 +85,7 @@ public class SlotProtocolTest extends TestLogger {
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime())) {
 
-			ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+			ResourceActions resourceManagerActions = mock(ResourceActions.class);
 
 			slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions);
 
@@ -147,7 +147,7 @@ public class SlotProtocolTest extends TestLogger {
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime())) {
 
-			ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+			ResourceActions resourceManagerActions = mock(ResourceActions.class);
 
 			slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e0b98969/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index dd12fef..b32d25c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -227,8 +227,9 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 	}
 
 	@Override
-	public void stopWorker(ResourceID resourceID) {
+	public boolean stopWorker(ResourceID resourceID) {
 		// TODO: Implement to stop the worker
+		return false;
 	}
 
 	@Override


[2/3] flink git commit: [hotfix] Remove redundant job status logging

Posted by tr...@apache.org.
[hotfix] Remove redundant job status logging


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bee8c995
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bee8c995
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bee8c995

Branch: refs/heads/master
Commit: bee8c995404cffc53e95800b77a3eb55355d08a5
Parents: e0b9896
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 11 00:16:31 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Oct 21 16:52:36 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/jobmaster/JobMaster.java  | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bee8c995/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 8f87b44..19bf2a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -866,14 +866,15 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		errorHandler.onFatalError(cause);
 	}
 
-	private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
+	private void jobStatusChanged(
+			final JobStatus newJobStatus,
+			long timestamp,
+			@Nullable final Throwable error) {
 		validateRunsInMainThread();
 
 		final JobID jobID = executionGraph.getJobID();
 		final String jobName = executionGraph.getJobName();
-
-		log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error);
-
+		
 		if (newJobStatus.isGloballyTerminalState()) {
 			switch (newJobStatus) {
 				case FINISHED: