You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:45 UTC
[26/52] [abbrv] flink git commit: [FLINK-4986] Improvements to the
JobMaster
[FLINK-4986] Improvements to the JobMaster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8730e200
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8730e200
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8730e200
Branch: refs/heads/master
Commit: 8730e200864427dd5d6ddb9f841978d68ab452bd
Parents: 91f1d09
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 17 20:26:58 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:25 2016 +0100
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 15 +
.../flink/runtime/jobmaster/JobMaster.java | 336 ++++++++++---------
2 files changed, 184 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8730e200/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cbb4c7e..2025fc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -621,6 +621,21 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
/**
+ * Gets the accumulator results.
+ */
+ public Map<String, Object> getAccumulators() throws IOException {
+
+ Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
+
+ Map<String, Object> result = new HashMap<>();
+ for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
+ result.put(entry.getKey(), entry.getValue().getLocalValue());
+ }
+
+ return result;
+ }
+
+ /**
* Gets a serialized accumulator map.
* @return The accumulator map with serialized accumulator values.
* @throws IOException
http://git-wip-us.apache.org/repos/asf/flink/blob/8730e200/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 458bf0c..0b3b68e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmaster;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
@@ -33,7 +34,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.BiFunction;
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotDescriptor;
import org.apache.flink.runtime.instance.SlotPool;
@@ -90,7 +91,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import javax.annotation.Nullable;
@@ -246,6 +246,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
-1,
log);
+ // register self as job status change listener
+ executionGraph.registerJobStatusListener(new JobManagerJobStatusListener());
+
this.slotPool = new SlotPool(executorService);
this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
@@ -269,13 +272,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
*/
public void start(final UUID leaderSessionID) throws Exception {
if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
- super.start();
+ // make sure the slot pool now accepts messages for this leader
slotPool.setJobManagerLeaderId(leaderSessionID);
- log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID());
+
+ // make sure we receive RPC and async calls
+ super.start();
+
+ log.info("JobManager started as leader {} for job {}", leaderSessionID, jobGraph.getJobID());
getSelf().startJobExecution();
- } else {
- log.warn("Job already started with leaderId {}, ignoring this start request.", leaderSessionID);
+ }
+ else {
+ log.warn("Job already started with leader ID {}, ignoring this start request.", leaderSessionID);
}
}
@@ -297,48 +305,21 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
@RpcMethod
public void startJobExecution() {
- log.info("Starting execution of job {} ({}) with leaderId {}.",
- jobGraph.getName(), jobGraph.getJobID(), leaderSessionID);
+ log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());
try {
- // register self as job status change listener
- executionGraph.registerJobStatusListener(new JobStatusListener() {
- @Override
- public void jobStatusChanges(
- final JobID jobId, final JobStatus newJobStatus, final long timestamp, final Throwable error)
- {
- // run in rpc thread to avoid concurrency
- runAsync(new Runnable() {
- @Override
- public void run() {
- jobStatusChanged(newJobStatus, timestamp, error);
- }
- });
- }
- });
-
// job is ready to go, try to establish connection with resource manager
+ // - activate leader retrieval for the resource manager
+ // - on notification of the leader, the connection will be established and
+ // the slot pool will start requesting slots
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
- } catch (Throwable t) {
-
- // TODO - this should not result in a job failure, but another leader should take over
- // TODO - either this master should retry the execution, or it should relinquish leadership / terminate
-
+ }
+ catch (Throwable t) {
log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
- executionGraph.fail(t);
-
- final JobExecutionException rt;
- if (t instanceof JobExecutionException) {
- rt = (JobExecutionException) t;
- } else {
- rt = new JobExecutionException(jobGraph.getJobID(),
- "Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
- }
-
- // TODO: notify client about this failure
+ handleFatalError(new Exception(
+ "Could not start job execution: Failed to start leader service for Resource Manager", t));
- jobCompletionActions.jobFailed(rt);
return;
}
@@ -348,7 +329,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
public void run() {
try {
executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout));
- } catch (Throwable t) {
+ }
+ catch (Throwable t) {
executionGraph.fail(t);
}
}
@@ -386,6 +368,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
closeResourceManagerConnection();
+ // TODO: in the future, the slot pool should not release the resources, so that
+ // TODO: the TaskManagers offer the resources to the new leader
for (ResourceID taskManagerId : registeredTaskManagers.keySet()) {
slotPool.releaseResource(taskManagerId);
}
@@ -405,14 +389,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
final UUID leaderSessionID,
final TaskExecutionState taskExecutionState) throws Exception
{
- if (taskExecutionState == null) {
- throw new NullPointerException("TaskExecutionState must not be null.");
- }
-
- if (!this.leaderSessionID.equals(leaderSessionID)) {
- throw new Exception("Leader id not match, expected: " + this.leaderSessionID
- + ", actual: " + leaderSessionID);
- }
+ checkNotNull(taskExecutionState, "taskExecutionState");
+ validateLeaderSessionId(leaderSessionID);
if (executionGraph.updateState(taskExecutionState)) {
return Acknowledge.get();
@@ -428,10 +406,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
final JobVertexID vertexID,
final ExecutionAttemptID executionAttempt) throws Exception
{
- if (!this.leaderSessionID.equals(leaderSessionID)) {
- throw new Exception("Leader id not match, expected: " + this.leaderSessionID
- + ", actual: " + leaderSessionID);
- }
+ validateLeaderSessionId(leaderSessionID);
final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
if (execution == null) {
@@ -477,16 +452,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
}
- @RpcMethod
public ExecutionState requestPartitionState(
final UUID leaderSessionID,
final IntermediateDataSetID intermediateResultId,
final ResultPartitionID resultPartitionId) throws Exception {
- if (!this.leaderSessionID.equals(leaderSessionID)) {
- throw new Exception("Leader id not match, expected: " + this.leaderSessionID
- + ", actual: " + leaderSessionID);
- }
+ validateLeaderSessionId(leaderSessionID);
final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
if (execution != null) {
@@ -520,10 +491,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
final UUID leaderSessionID,
final ResultPartitionID partitionID) throws Exception
{
- if (!this.leaderSessionID.equals(leaderSessionID)) {
- throw new Exception("Leader id not match, expected: " + this.leaderSessionID
- + ", actual: " + leaderSessionID);
- }
+ validateLeaderSessionId(leaderSessionID);
executionGraph.scheduleOrUpdateConsumers(partitionID);
return Acknowledge.get();
@@ -534,6 +502,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
throw new UnsupportedOperationException();
}
+ // TODO: This method needs a leader session ID
@RpcMethod
public void acknowledgeCheckpoint(
final JobID jobID,
@@ -562,6 +531,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
}
+ // TODO: This method needs a leader session ID
@RpcMethod
public void declineCheckpoint(
final JobID jobID,
@@ -657,10 +627,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
public Iterable<SlotOffer> offerSlots(final ResourceID taskManagerId,
final Iterable<SlotOffer> slots, final UUID leaderId) throws Exception
{
- if (!this.leaderSessionID.equals(leaderId)) {
- throw new Exception("Leader id not match, expected: " + this.leaderSessionID
- + ", actual: " + leaderId);
- }
+ validateLeaderSessionId(leaderSessionID);
Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);
if (taskManager == null) {
@@ -689,10 +656,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
final UUID leaderId,
final Exception cause) throws Exception
{
- if (!this.leaderSessionID.equals(leaderId)) {
- throw new Exception("Leader id not match, expected: " + this.leaderSessionID
- + ", actual: " + leaderId);
- }
+ validateLeaderSessionId(leaderSessionID);
if (!registeredTaskManagers.containsKey(taskManagerId)) {
throw new Exception("Unknown TaskManager " + taskManagerId);
@@ -782,62 +746,55 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
+ validateRunsInMainThread();
+
final JobID jobID = executionGraph.getJobID();
final String jobName = executionGraph.getJobName();
+
log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error);
if (newJobStatus.isGloballyTerminalState()) {
- // TODO set job end time in JobInfo
-
- /*
- TODO
- if (jobInfo.sessionAlive) {
- jobInfo.setLastActive()
- val lastActivity = jobInfo.lastActive
- context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
- // remove only if no activity occurred in the meantime
- if (lastActivity == jobInfo.lastActive) {
- self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
- }
- }(context.dispatcher)
- } else {
- self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
- }
- */
-
- if (newJobStatus == JobStatus.FINISHED) {
- try {
- final Map<String, SerializedValue<Object>> accumulatorResults =
- executionGraph.getAccumulatorsSerialized();
- final SerializedJobExecutionResult result = new SerializedJobExecutionResult(
- jobID, 0, accumulatorResults // TODO get correct job duration
- );
- jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
- } catch (Exception e) {
- log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
+ switch (newJobStatus) {
+ case FINISHED:
+ try {
+ // TODO get correct job duration
+ // job done, let's get the accumulators
+ Map<String, Object> accumulatorResults = executionGraph.getAccumulators();
+ JobExecutionResult result = new JobExecutionResult(jobID, 0L, accumulatorResults);
+
+ jobCompletionActions.jobFinished(result);
+ }
+ catch (Exception e) {
+ log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
+
+ final JobExecutionException exception = new JobExecutionException(jobID,
+ "Failed to retrieve accumulator results. " +
+ "The job is registered as 'FINISHED (successful), but this notification describes " +
+ "a failure, since the resulting accumulators could not be fetched.", e);
+
+ jobCompletionActions.jobFailed(exception);
+ }
+ break;
+
+ case CANCELED: {
final JobExecutionException exception = new JobExecutionException(
- jobID, "Failed to retrieve accumulator results.", e);
- // TODO should we also notify client?
+ jobID, "Job was cancelled.", new Exception("The job was cancelled"));
+
jobCompletionActions.jobFailed(exception);
+ break;
}
- } else if (newJobStatus == JobStatus.CANCELED) {
- final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
- final JobExecutionException exception = new JobExecutionException(
- jobID, "Job was cancelled.", unpackedError);
- // TODO should we also notify client?
- jobCompletionActions.jobFailed(exception);
- } else if (newJobStatus == JobStatus.FAILED) {
- final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
- final JobExecutionException exception = new JobExecutionException(
- jobID, "Job execution failed.", unpackedError);
- // TODO should we also notify client?
- jobCompletionActions.jobFailed(exception);
- } else {
- final JobExecutionException exception = new JobExecutionException(
- jobID, newJobStatus + " is not a terminal state.");
- // TODO should we also notify client?
- jobCompletionActions.jobFailed(exception);
- throw new RuntimeException(exception);
+
+ case FAILED: {
+ final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
+ final JobExecutionException exception = new JobExecutionException(
+ jobID, "Job execution failed.", unpackedError);
+ jobCompletionActions.jobFailed(exception);
+ break;
+ }
+
+ default:
+ // this can happen only if the enum is buggy
+ throw new IllegalStateException(newJobStatus.toString());
}
}
}
@@ -845,57 +802,52 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
private void notifyOfNewResourceManagerLeader(
final String resourceManagerAddress, final UUID resourceManagerLeaderId)
{
- // IMPORTANT: executed by main thread to avoid concurrence
- runAsync(new Runnable() {
- @Override
- public void run() {
- if (resourceManagerConnection != null) {
- if (resourceManagerAddress != null) {
- if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
- && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
- // both address and leader id are not changed, we can keep the old connection
- return;
- }
- log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
- resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
- } else {
- log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
- resourceManagerConnection.getTargetAddress());
- }
- }
+ validateRunsInMainThread();
- closeResourceManagerConnection();
-
- if (resourceManagerAddress != null) {
- log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
- resourceManagerConnection = new ResourceManagerConnection(
- log, jobGraph.getJobID(), leaderSessionID,
- resourceManagerAddress, resourceManagerLeaderId, executionContext);
- resourceManagerConnection.start();
+ if (resourceManagerConnection != null) {
+ if (resourceManagerAddress != null) {
+ if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
+ && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
+ // both address and leader id are not changed, we can keep the old connection
+ return;
}
+ log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
+ resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+ } else {
+ log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
+ resourceManagerConnection.getTargetAddress());
}
- });
+ }
+
+ closeResourceManagerConnection();
+
+ if (resourceManagerAddress != null) {
+ log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
+ resourceManagerConnection = new ResourceManagerConnection(
+ log, jobGraph.getJobID(), getAddress(), leaderSessionID,
+ resourceManagerAddress, resourceManagerLeaderId, executionContext);
+ resourceManagerConnection.start();
+ }
}
private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) {
- getRpcService().execute(new Runnable() {
- @Override
- public void run() {
- // TODO - add tests for comment in https://github.com/apache/flink/pull/2565
- // verify the response with current connection
- if (resourceManagerConnection != null
- && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
- {
- log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
- success.getResourceManagerLeaderId());
- slotPool.setResourceManager(success.getResourceManagerLeaderId(),
- resourceManagerConnection.getTargetGateway());
- }
- }
- });
+ validateRunsInMainThread();
+
+ // verify the response with current connection
+ if (resourceManagerConnection != null
+ && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
+ {
+ log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
+ success.getResourceManagerLeaderId());
+
+ slotPool.setResourceManager(
+ success.getResourceManagerLeaderId(), resourceManagerConnection.getTargetGateway());
+ }
}
private void closeResourceManagerConnection() {
+ validateRunsInMainThread();
+
if (resourceManagerConnection != null) {
resourceManagerConnection.close();
resourceManagerConnection = null;
@@ -903,32 +855,49 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
slotPool.disconnectResourceManager();
}
+ private void validateLeaderSessionId(UUID leaderSessionID) throws LeaderIdMismatchException {
+ if (this.leaderSessionID == null || !this.leaderSessionID.equals(leaderSessionID)) {
+ throw new LeaderIdMismatchException(this.leaderSessionID, leaderSessionID);
+ }
+ }
+
//----------------------------------------------------------------------------------------------
// Utility classes
//----------------------------------------------------------------------------------------------
private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
+
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
- notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+ }
+ });
}
@Override
public void handleError(final Exception exception) {
- handleFatalError(exception);
+ handleFatalError(new Exception("Fatal error in the ResourceManager leader service", exception));
}
}
+ //----------------------------------------------------------------------------------------------
+
private class ResourceManagerConnection
extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess>
{
private final JobID jobID;
+ private final String jobManagerRpcAddress;
+
private final UUID jobManagerLeaderID;
ResourceManagerConnection(
final Logger log,
final JobID jobID,
+ final String jobManagerRpcAddress,
final UUID jobManagerLeaderID,
final String resourceManagerAddress,
final UUID resourceManagerLeaderID,
@@ -936,6 +905,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
{
super(log, resourceManagerAddress, resourceManagerLeaderID, executor);
this.jobID = checkNotNull(jobID);
+ this.jobManagerRpcAddress = checkNotNull(jobManagerRpcAddress);
this.jobManagerLeaderID = checkNotNull(jobManagerLeaderID);
}
@@ -946,18 +916,29 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
getTargetAddress(), getTargetLeaderId())
{
@Override
- protected Future<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, UUID leaderId,
- long timeoutMillis) throws Exception
+ protected Future<RegistrationResponse> invokeRegistration(
+ ResourceManagerGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
{
Time timeout = Time.milliseconds(timeoutMillis);
- return gateway.registerJobManager(leaderId, jobManagerLeaderID, getAddress(), jobID, timeout);
+
+ return gateway.registerJobManager(
+ leaderId,
+ jobManagerLeaderID,
+ jobManagerRpcAddress,
+ jobID,
+ timeout);
}
};
}
@Override
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
- onResourceManagerRegistrationSuccess(success);
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ onResourceManagerRegistrationSuccess(success);
+ }
+ });
}
@Override
@@ -965,4 +946,25 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
handleFatalError(failure);
}
}
+
+ //----------------------------------------------------------------------------------------------
+
+ private class JobManagerJobStatusListener implements JobStatusListener {
+
+ @Override
+ public void jobStatusChanges(
+ final JobID jobId,
+ final JobStatus newJobStatus,
+ final long timestamp,
+ final Throwable error) {
+
+ // run in rpc thread to avoid concurrency
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ jobStatusChanged(newJobStatus, timestamp, error);
+ }
+ });
+ }
+ }
}