You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/24 21:12:54 UTC

[5/6] flink git commit: [FLINK-4457] Make ExecutionGraph independent of actors.

[FLINK-4457] Make ExecutionGraph independent of actors.

This introduced types JobStatusListener and ExecutionStatusListener interfaces
that replace the ActorRefs and ActorGateway for listeners


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/635c8693
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/635c8693
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/635c8693

Branch: refs/heads/master
Commit: 635c869326cc77e4199e4d8ee597aed69ed16cd2
Parents: 4e9d177
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 24 19:12:07 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 24 21:19:04 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 34 +++-----
 .../CheckpointCoordinatorDeActivator.java       | 45 ++++-------
 .../runtime/executiongraph/ExecutionGraph.java  | 81 ++++++++++----------
 .../executiongraph/ExecutionStatusListener.java | 54 +++++++++++++
 .../executiongraph/JobStatusListener.java       | 39 ++++++++++
 .../executiongraph/StatusListenerMessenger.java | 70 +++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   | 12 ++-
 ...ExecutionGraphCheckpointCoordinatorTest.java |  3 -
 .../LeaderChangeJobRecoveryTest.java            | 73 ++++--------------
 .../flink/core/testutils/OneShotLatch.java      | 55 +++++++++++++
 .../flink/core/testutils/OneShotLatchTest.java  | 55 +++++++++++++
 11 files changed, 360 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b710324..3619f48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
 import akka.dispatch.Futures;
 
 import org.apache.flink.api.common.JobID;
@@ -31,8 +28,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
@@ -57,7 +53,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -137,7 +132,7 @@ public class CheckpointCoordinator {
 	private final Timer timer;
 
 	/** Actor that receives status updates from the execution graph this coordinator works for */
-	private ActorGateway jobStatusListener;
+	private JobStatusListener jobStatusListener;
 
 	/** The number of consecutive failed trigger attempts */
 	private int numUnsuccessfulCheckpointsTriggers;
@@ -266,12 +261,6 @@ public class CheckpointCoordinator {
 				// shut down the thread that handles the timeouts and pending triggers
 				timer.cancel();
 
-				// make sure that the actor does not linger
-				if (jobStatusListener != null) {
-					jobStatusListener.tell(PoisonPill.getInstance());
-					jobStatusListener = null;
-				}
-
 				// clear and discard all pending checkpoints
 				for (PendingCheckpoint pending : pendingCheckpoints.values()) {
 					pending.abortError(new Exception("Checkpoint Coordinator is shutting down"));
@@ -903,7 +892,7 @@ public class CheckpointCoordinator {
 	//  Periodic scheduling of checkpoints
 	// --------------------------------------------------------------------------------------------
 
-	public void startCheckpointScheduler() throws Exception {
+	public void startCheckpointScheduler() {
 		synchronized (lock) {
 			if (shutdown) {
 				throw new IllegalArgumentException("Checkpoint coordinator is shut down");
@@ -918,7 +907,7 @@ public class CheckpointCoordinator {
 		}
 	}
 
-	public void stopCheckpointScheduler() throws Exception {
+	public void stopCheckpointScheduler() {
 		synchronized (lock) {
 			triggerRequestQueued = false;
 			periodicScheduling = false;
@@ -929,10 +918,14 @@ public class CheckpointCoordinator {
 			}
 
 			for (PendingCheckpoint p : pendingCheckpoints.values()) {
-				p.abortError(new Exception("Checkpoint Coordinator is suspending."));
+				try {
+					p.abortError(new Exception("Checkpoint Coordinator is suspending."));
+				} catch (Throwable t) {
+					LOG.error("Error while disposing pending checkpoint", t);
+				}
 			}
-			pendingCheckpoints.clear();
 
+			pendingCheckpoints.clear();
 			numUnsuccessfulCheckpointsTriggers = 0;
 		}
 	}
@@ -941,17 +934,14 @@ public class CheckpointCoordinator {
 	//  job status listener that schedules / cancels periodic checkpoints
 	// ------------------------------------------------------------------------
 
-	public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
+	public JobStatusListener createActivatorDeactivator() {
 		synchronized (lock) {
 			if (shutdown) {
 				throw new IllegalArgumentException("Checkpoint coordinator is shut down");
 			}
 
 			if (jobStatusListener == null) {
-				Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID);
-
-				// wrap the ActorRef in a AkkaActorGateway to support message decoration
-				jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
+				jobStatusListener = new CheckpointCoordinatorDeActivator(this);
 			}
 
 			return jobStatusListener;

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
index 7e26f71..2e23d6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
@@ -18,51 +18,32 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.util.Preconditions;
 
-import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This actor listens to changes in the JobStatus and activates or deactivates the periodic
  * checkpoint scheduler.
  */
-public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
+public class CheckpointCoordinatorDeActivator implements JobStatusListener {
 
 	private final CheckpointCoordinator coordinator;
-	private final UUID leaderSessionID;
-	
-	public CheckpointCoordinatorDeActivator(
-			CheckpointCoordinator coordinator,
-			UUID leaderSessionID) {
 
-		LOG.info("Create CheckpointCoordinatorDeActivator");
-
-		this.coordinator = Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null.");
-		this.leaderSessionID = leaderSessionID;
+	public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
+		this.coordinator = checkNotNull(coordinator);
 	}
 
 	@Override
-	public void handleMessage(Object message) throws Exception {
-		if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
-			JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();
-			
-			if (status == JobStatus.RUNNING) {
-				// start the checkpoint scheduler
-				coordinator.startCheckpointScheduler();
-			} else {
-				// anything else should stop the trigger for now
-				coordinator.stopCheckpointScheduler();
-			}
+	public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+		if (newJobStatus == JobStatus.RUNNING) {
+			// start the checkpoint scheduler
+			coordinator.startCheckpointScheduler();
+		} else {
+			// anything else should stop the trigger for now
+			coordinator.stopCheckpointScheduler();
 		}
-		
-		// we ignore all other messages
-	}
-
-	@Override
-	public UUID getLeaderSessionID() {
-		return leaderSessionID;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 12d8e66..7a94c0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.actor.ActorSystem;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -41,7 +40,6 @@ import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -50,15 +48,16 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -75,12 +74,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The execution graph is the central data structure that coordinates the distributed
  * execution of a data flow. It keeps representations of each parallel task, each
@@ -151,12 +150,12 @@ public class ExecutionGraph {
 	 * accessible on all nodes in the cluster. */
 	private final List<URL> requiredClasspaths;
 
-	/** Listeners that receive messages when the entire job switches it status (such as from
-	 * RUNNING to FINISHED) */
-	private final List<ActorGateway> jobStatusListenerActors;
+	/** Listeners that receive messages when the entire job switches it status
+	 * (such as from RUNNING to FINISHED) */
+	private final List<JobStatusListener> jobStatusListeners;
 
 	/** Listeners that receive messages whenever a single task execution changes its status */
-	private final List<ActorGateway> executionListenerActors;
+	private final List<ExecutionStatusListener> executionListeners;
 
 	/** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when
 	 * the execution graph transitioned into a certain state. The index into this array is the
@@ -284,8 +283,8 @@ public class ExecutionGraph {
 		this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>();
 		this.currentExecutions = new ConcurrentHashMap<ExecutionAttemptID, Execution>();
 
-		this.jobStatusListenerActors  = new CopyOnWriteArrayList<ActorGateway>();
-		this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>();
+		this.jobStatusListeners  = new CopyOnWriteArrayList<>();
+		this.executionListeners = new CopyOnWriteArrayList<>();
 
 		this.stateTimestamps = new long[JobStatus.values().length];
 		this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
@@ -345,8 +344,6 @@ public class ExecutionGraph {
 			List<ExecutionJobVertex> verticesToTrigger,
 			List<ExecutionJobVertex> verticesToWaitFor,
 			List<ExecutionJobVertex> verticesToCommitTo,
-			ActorSystem actorSystem,
-			UUID leaderSessionID,
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore checkpointStore,
 			SavepointStore savepointStore,
@@ -388,8 +385,7 @@ public class ExecutionGraph {
 
 		// the periodic checkpoint scheduler is activated and deactivated as a result of
 		// job status changes (running -> on, all other states -> off)
-		registerJobStatusListener(
-				checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
+		registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
 	}
 
 	/**
@@ -935,8 +931,8 @@ public class ExecutionGraph {
 		intermediateResults.clear();
 		currentExecutions.clear();
 		requiredJarFiles.clear();
-		jobStatusListenerActors.clear();
-		executionListenerActors.clear();
+		jobStatusListeners.clear();
+		executionListeners.clear();
 
 		isArchived = true;
 	}
@@ -1173,45 +1169,52 @@ public class ExecutionGraph {
 	//  Listeners & Observers
 	// --------------------------------------------------------------------------------------------
 
-	public void registerJobStatusListener(ActorGateway listener) {
+	public void registerJobStatusListener(JobStatusListener listener) {
 		if (listener != null) {
-			this.jobStatusListenerActors.add(listener);
+			jobStatusListeners.add(listener);
 		}
 	}
 
-	public void registerExecutionListener(ActorGateway listener) {
+	public void registerExecutionListener(ExecutionStatusListener listener) {
 		if (listener != null) {
-			this.executionListenerActors.add(listener);
+			executionListeners.add(listener);
 		}
 	}
 
 	private void notifyJobStatusChange(JobStatus newState, Throwable error) {
-		if (jobStatusListenerActors.size() > 0) {
-			ExecutionGraphMessages.JobStatusChanged message =
-					new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(),
-							error == null ? null : new SerializedThrowable(error));
-
-			for (ActorGateway listener: jobStatusListenerActors) {
-				listener.tell(message);
+		if (jobStatusListeners.size() > 0) {
+			final long timestamp = System.currentTimeMillis();
+			final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
+
+			for (JobStatusListener listener : jobStatusListeners) {
+				try {
+					listener.jobStatusChanges(jobID, newState, timestamp, serializedError);
+				} catch (Throwable t) {
+					LOG.warn("Error while notifying JobStatusListener", t);
+				}
 			}
 		}
 	}
 
-	void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
-							newExecutionState, Throwable error)
+	void notifyExecutionChange(
+			JobVertexID vertexId, int subtask, ExecutionAttemptID executionID,
+			ExecutionState newExecutionState, Throwable error)
 	{
 		ExecutionJobVertex vertex = getJobVertex(vertexId);
 
-		if (executionListenerActors.size() > 0) {
-			String message = error == null ? null : ExceptionUtils.stringifyException(error);
-			ExecutionGraphMessages.ExecutionStateChanged actorMessage =
-					new ExecutionGraphMessages.ExecutionStateChanged(jobID, vertexId,  vertex.getJobVertex().getName(),
-																	vertex.getParallelism(), subtask,
-																	executionID, newExecutionState,
-																	System.currentTimeMillis(), message);
-
-			for (ActorGateway listener : executionListenerActors) {
-				listener.tell(actorMessage);
+		if (executionListeners.size() > 0) {
+			final String message = error == null ? null : ExceptionUtils.stringifyException(error);
+			final long timestamp = System.currentTimeMillis();
+
+			for (ExecutionStatusListener listener : executionListeners) {
+				try {
+					listener.executionStatusChanged(
+							jobID, vertexId, vertex.getJobVertex().getName(),
+							vertex.getParallelism(), subtask, executionID, newExecutionState,
+							timestamp, message);
+				} catch (Throwable t) {
+					LOG.warn("Error while notifying ExecutionStatusListener", t);
+				}
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
new file mode 100644
index 0000000..6fb5a1a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/**
+ * Interface for observers that monitor the status of individual task executions.
+ */
+public interface ExecutionStatusListener {
+
+	/**
+	 * Called whenever the execution status of a task changes.
+	 * 
+	 * @param jobID                  The ID of the job
+	 * @param vertexID               The ID of the task vertex
+	 * @param taskName               The name of the task
+	 * @param totalNumberOfSubTasks  The parallelism of the task
+	 * @param subtaskIndex           The subtask's parallel index
+	 * @param executionID            The ID of the execution attempt
+	 * @param newExecutionState      The status to which the task switched
+	 * @param timestamp              The timestamp when the change occurred. Informational only.
+	 * @param optionalMessage        An optional message attached to the status change, like an
+	 *                               exception message.
+	 */
+	void executionStatusChanged(
+			JobID jobID,
+			JobVertexID vertexID,
+			String taskName,
+			int totalNumberOfSubTasks,
+			int subtaskIndex,
+			ExecutionAttemptID executionID,
+			ExecutionState newExecutionState,
+			long timestamp,
+			String optionalMessage);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
new file mode 100644
index 0000000..1d97a5c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+/**
+ * Interface for observers that monitor the status of a job.
+ */
+public interface JobStatusListener {
+
+	/**
+	 * This method is called whenever the status of the job changes.
+	 * 
+	 * @param jobId         The ID of the job.
+	 * @param newJobStatus  The status the job switched to.
+	 * @param timestamp     The timestamp when the status transition occurred.
+	 * @param error         In case the job status switches to a failure state, this is the
+	 *                      exception that caused the failure.
+	 */
+	void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
new file mode 100644
index 0000000..01f1e75
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.ExecutionGraphMessages;
+import org.apache.flink.runtime.util.SerializedThrowable;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code JobStatusListener} and {@code ExecutionStatusListener} that sends an actor message
+ * for each status change.
+ */
+public class StatusListenerMessenger implements JobStatusListener, ExecutionStatusListener {
+
+	private final AkkaActorGateway target;
+
+	public StatusListenerMessenger(ActorRef target, UUID leaderSessionId) {
+		this.target = new AkkaActorGateway(checkNotNull(target), leaderSessionId);
+	}
+
+	@Override
+	public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+		ExecutionGraphMessages.JobStatusChanged message =
+				new ExecutionGraphMessages.JobStatusChanged(jobId, newJobStatus, timestamp,
+						error == null ? null : new SerializedThrowable(error));
+
+		target.tell(message);
+	}
+
+	@Override
+	public void executionStatusChanged(
+			JobID jobID, JobVertexID vertexID,
+			String taskName, int taskParallelism, int subtaskIndex,
+			ExecutionAttemptID executionID, ExecutionState newExecutionState,
+			long timestamp, String optionalMessage) {
+		
+		ExecutionGraphMessages.ExecutionStateChanged message = 
+				new ExecutionGraphMessages.ExecutionStateChanged(
+					jobID, vertexID, taskName, taskParallelism, subtaskIndex,
+					executionID, newExecutionState, timestamp, optionalMessage);
+
+		target.tell(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 34fed3f..0587987 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -51,7 +51,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
+import org.apache.flink.runtime.executiongraph.{StatusListenerMessenger, ExecutionGraph, ExecutionJobVertex}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
@@ -1249,8 +1249,6 @@ class JobManager(
             triggerVertices,
             ackVertices,
             confirmVertices,
-            context.system,
-            leaderSessionID.orNull,
             checkpointIdCounter,
             completedCheckpoints,
             savepointStore,
@@ -1259,14 +1257,14 @@ class JobManager(
 
         // get notified about job status changes
         executionGraph.registerJobStatusListener(
-          new AkkaActorGateway(self, leaderSessionID.orNull))
+          new StatusListenerMessenger(self, leaderSessionID.orNull))
 
         if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
           // the sender wants to be notified about state changes
-          val gateway = new AkkaActorGateway(jobInfo.client, leaderSessionID.orNull)
+          val listener  = new StatusListenerMessenger(jobInfo.client, leaderSessionID.orNull)
 
-          executionGraph.registerExecutionListener(gateway)
-          executionGraph.registerJobStatusListener(gateway)
+          executionGraph.registerExecutionListener(listener)
+          executionGraph.registerJobStatusListener(listener)
         }
       } catch {
         case t: Throwable =>

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 7b05fd7..49a9449 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -40,7 +40,6 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.net.URL;
 import java.util.Collections;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Mockito.mock;
@@ -117,8 +116,6 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 				Collections.<ExecutionJobVertex>emptyList(),
 				Collections.<ExecutionJobVertex>emptyList(),
 				Collections.<ExecutionJobVertex>emptyList(),
-				system,
-				UUID.randomUUID(),
 				counter,
 				store,
 				new HeapSavepointStore(),

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 57de2cd..450f9fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -18,28 +18,28 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Before;
 import org.junit.Test;
+
 import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
-import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.UUID;
@@ -113,15 +113,12 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 
 		ExecutionGraph executionGraph = ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
 
-		TestActorGateway testActorGateway = new TestActorGateway();
-
-		executionGraph.registerJobStatusListener(testActorGateway);
+		TestJobStatusListener testListener = new TestJobStatusListener();
+		executionGraph.registerJobStatusListener(testListener);
 
 		cluster.revokeLeadership();
 
-		Future<Boolean> hasReachedTerminalState = testActorGateway.hasReachedTerminalState();
-
-		assertTrue("The job should have reached a terminal state.", Await.result(hasReachedTerminalState, timeout));
+		testListener.waitForTerminalState(30000);
 	}
 
 	public JobGraph createBlockingJob(int parallelism) {
@@ -150,59 +147,19 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 		return jobGraph;
 	}
 
-	public static class TestActorGateway implements ActorGateway {
-
-		private static final long serialVersionUID = -736146686160538227L;
-		private transient Promise<Boolean> terminalState = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-		public Future<Boolean> hasReachedTerminalState() {
-			return terminalState.future();
-		}
+	public static class TestJobStatusListener implements JobStatusListener {
 
-		@Override
-		public Future<Object> ask(Object message, FiniteDuration timeout) {
-			return null;
-		}
+		private final OneShotLatch terminalStateLatch = new OneShotLatch();
 
-		@Override
-		public void tell(Object message) {
-			this.tell(message, new AkkaActorGateway(ActorRef.noSender(), null));
+		public void waitForTerminalState(long timeoutMillis) throws InterruptedException, TimeoutException {
+			terminalStateLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
 		}
 
 		@Override
-		public void tell(Object message, ActorGateway sender) {
-			if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
-				ExecutionGraphMessages.JobStatusChanged jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged) message;
-
-				if (jobStatusChanged.newJobStatus().isGloballyTerminalState() || jobStatusChanged.newJobStatus() == JobStatus.SUSPENDED) {
-					terminalState.success(true);
-				}
+		public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+			if (newJobStatus.isGloballyTerminalState() || newJobStatus == JobStatus.SUSPENDED) {
+				terminalStateLatch.trigger();
 			}
 		}
-
-		@Override
-		public void forward(Object message, ActorGateway sender) {
-
-		}
-
-		@Override
-		public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
-			return null;
-		}
-
-		@Override
-		public String path() {
-			return null;
-		}
-
-		@Override
-		public ActorRef actor() {
-			return null;
-		}
-
-		@Override
-		public UUID leaderSessionID() {
-			return null;
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index 54ac110..0418bf5 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 /**
  * Latch for synchronizing parts of code in tests. Once the latch has fired once calls to
  * {@link #await()} will return immediately in the future.
@@ -44,6 +47,8 @@ public final class OneShotLatch {
 	/**
 	 * Waits until {@link #trigger())} is called. Once {@code #trigger()} has been called this
 	 * call will always return immediately.
+	 * 
+	 * @throws InterruptedException Thrown if the thread is interrupted while waiting.
 	 */
 	public void await() throws InterruptedException {
 		synchronized (lock) {
@@ -52,4 +57,54 @@ public final class OneShotLatch {
 			}
 		}
 	}
+
+	/**
+	 * Waits until {@link #trigger())} is called. Once {@code #trigger()} has been called this
+	 * call will always return immediately.
+	 * 
+	 * <p>If the latch is not triggered within the given timeout, a {@code TimeoutException}
+	 * will be thrown after the timeout.
+	 * 
+	 * <p>A timeout value of zero means infinite timeout and make this equivalent to {@link #await()}.
+	 * 
+	 * @param timeout   The value of the timeout, a value of zero indicating infinite timeout.
+	 * @param timeUnit  The unit of the timeout
+	 * 
+	 * @throws InterruptedException Thrown if the thread is interrupted while waiting.
+	 * @throws TimeoutException Thrown, if the latch is not triggered within the timeout time.
+	 */
+	public void await(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
+		if (timeout < 0) {
+			throw new IllegalArgumentException("time may not be negative");
+		}
+		if (timeUnit == null) {
+			throw new NullPointerException("timeUnit");
+		}
+
+		if (timeout == 0) {
+			await();
+		} else {
+			final long deadline = System.nanoTime() + timeUnit.toNanos(timeout);
+			long millisToWait;
+
+			synchronized (lock) {
+				while (!triggered && (millisToWait = (deadline - System.nanoTime()) / 1_000_000) > 0) {
+					lock.wait(millisToWait);
+				}
+
+				if (!triggered) {
+					throw new TimeoutException();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Checks if the latch was triggered.
+	 * 
+	 * @return True, if the latch was triggered, false if not.
+	 */
+	public boolean isTriggered() {
+		return triggered;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/core/testutils/OneShotLatchTest.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/core/testutils/OneShotLatchTest.java b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/core/testutils/OneShotLatchTest.java
new file mode 100644
index 0000000..575c84c
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/core/testutils/OneShotLatchTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class OneShotLatchTest {
+
+	@Test
+	public void testAwaitWithTimeout() throws Exception {
+		OneShotLatch latch = new OneShotLatch();
+		assertFalse(latch.isTriggered());
+
+		try {
+			latch.await(1, TimeUnit.MILLISECONDS);
+			fail("should fail with a TimeoutException");
+		} catch (TimeoutException e) {
+			// expected
+		}
+
+		assertFalse(latch.isTriggered());
+
+		latch.trigger();
+		assertTrue(latch.isTriggered());
+
+		latch.await(100, TimeUnit.DAYS);
+		assertTrue(latch.isTriggered());
+
+		latch.await(0, TimeUnit.MILLISECONDS);
+		assertTrue(latch.isTriggered());
+	}
+}