You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:21 UTC
[02/52] [abbrv] flink git commit: [FLINK-4375] [distributed
coordination] Implement new JobManager creation, initialization,
and basic RPC methods
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index b94f904..abc59cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,26 +18,19 @@
package org.apache.flink.runtime.jobmaster;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
@@ -48,9 +41,10 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -59,16 +53,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
-import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
-import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -84,22 +73,26 @@ import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedValue;
+
import org.slf4j.Logger;
+import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -110,16 +103,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* It offers the following methods as part of its rpc interface to interact with the JobMaster
* remotely:
* <ul>
- * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
+ * <li>{@link #updateTaskExecutionState} updates the task execution state for
* given task</li>
* </ul>
*/
public class JobMaster extends RpcEndpoint<JobMasterGateway> {
+ private static final AtomicReferenceFieldUpdater<JobMaster, UUID> LEADER_ID_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(JobMaster.class, UUID.class, "leaderSessionID");
+
+ // ------------------------------------------------------------------------
+
/** Logical representation of the job */
private final JobGraph jobGraph;
- /** Configuration of the job */
+ /** Configuration of the JobManager */
private final Configuration configuration;
/** Service to contend for and retrieve the leadership of JM and RM */
@@ -128,37 +126,24 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** Blob cache manager used across jobs */
private final BlobLibraryCacheManager libraryCacheManager;
- /** Factory to create restart strategy for this job */
- private final RestartStrategyFactory restartStrategyFactory;
-
- /** Store for save points */
- private final SavepointStore savepointStore;
-
- /** The timeout for this job */
- private final Time timeout;
-
- /** The scheduler to use for scheduling new tasks as they are needed */
- private final Scheduler scheduler;
+ /** The metrics for the JobManager itself */
+ private final MetricGroup jobManagerMetricGroup;
- /** The metrics group used across jobs */
- private final JobManagerMetricGroup jobManagerMetricGroup;
+ /** The metrics for the job */
+ private final MetricGroup jobMetricGroup;
/** The execution context which is used to execute futures */
- private final Executor executionContext;
+ private final ExecutorService executionContext;
private final OnCompletionActions jobCompletionActions;
- /** The execution graph of this job */
- private volatile ExecutionGraph executionGraph;
-
- /** The checkpoint recovery factory used by this job */
- private CheckpointRecoveryFactory checkpointRecoveryFactory;
+ private final FatalErrorHandler errorHandler;
- private ClassLoader userCodeLoader;
+ private final ClassLoader userCodeLoader;
- private RestartStrategy restartStrategy;
+ /** The execution graph of this job */
+ private final ExecutionGraph executionGraph;
- private MetricGroup jobMetrics;
private volatile UUID leaderSessionID;
@@ -168,22 +153,26 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
private LeaderRetrievalService resourceManagerLeaderRetriever;
/** Connection with ResourceManager, null if not located address yet or we close it initiative */
- private volatile ResourceManagerConnection resourceManagerConnection;
+ private ResourceManagerConnection resourceManagerConnection;
+
+ // TODO - we need to replace this with the slot pool
+ private final Scheduler scheduler;
// ------------------------------------------------------------------------
public JobMaster(
- JobGraph jobGraph,
- Configuration configuration,
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityService,
- BlobLibraryCacheManager libraryCacheManager,
- RestartStrategyFactory restartStrategyFactory,
- SavepointStore savepointStore,
- Time timeout,
- Scheduler scheduler,
- JobManagerMetricGroup jobManagerMetricGroup,
- OnCompletionActions jobCompletionActions)
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityService,
+ ExecutorService executorService,
+ BlobLibraryCacheManager libraryCacheManager,
+ RestartStrategyFactory restartStrategyFactory,
+ Time rpcAskTimeout,
+ @Nullable JobManagerMetricGroup jobManagerMetricGroup,
+ OnCompletionActions jobCompletionActions,
+ FatalErrorHandler errorHandler,
+ ClassLoader userCodeLoader) throws Exception
{
super(rpcService);
@@ -191,293 +180,150 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
this.configuration = checkNotNull(configuration);
this.highAvailabilityServices = checkNotNull(highAvailabilityService);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
- this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
- this.savepointStore = checkNotNull(savepointStore);
- this.timeout = checkNotNull(timeout);
- this.scheduler = checkNotNull(scheduler);
- this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup);
- this.executionContext = checkNotNull(rpcService.getExecutor());
+ this.executionContext = checkNotNull(executorService);
this.jobCompletionActions = checkNotNull(jobCompletionActions);
- }
-
- //----------------------------------------------------------------------------------------------
- // Lifecycle management
- //----------------------------------------------------------------------------------------------
+ this.errorHandler = checkNotNull(errorHandler);
+ this.userCodeLoader = checkNotNull(userCodeLoader);
- /**
- * Initializing the job execution environment, should be called before start. Any error occurred during
- * initialization will be treated as job submission failure.
- *
- * @throws JobSubmissionException
- */
- public void init() throws JobSubmissionException {
- log.info("Initializing job {} ({}).", jobGraph.getName(), jobGraph.getJobID());
+ final String jobName = jobGraph.getName();
+ final JobID jid = jobGraph.getJobID();
- try {
- // IMPORTANT: We need to make sure that the library registration is the first action,
- // because this makes sure that the uploaded jar files are removed in case of
- // unsuccessful
- try {
- libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(),
- jobGraph.getClasspaths());
- } catch (Throwable t) {
- throw new JobSubmissionException(jobGraph.getJobID(),
- "Cannot set up the user code libraries: " + t.getMessage(), t);
- }
+ if (jobManagerMetricGroup != null) {
+ this.jobManagerMetricGroup = jobManagerMetricGroup;
+ this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph);
+ } else {
+ this.jobManagerMetricGroup = new UnregisteredMetricsGroup();
+ this.jobMetricGroup = new UnregisteredMetricsGroup();
+ }
- userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID());
- if (userCodeLoader == null) {
- throw new JobSubmissionException(jobGraph.getJobID(),
- "The user code class loader could not be initialized.");
- }
+ log.info("Initializing job {} ({}).", jobName, jid);
- if (jobGraph.getNumberOfVertices() == 0) {
- throw new JobSubmissionException(jobGraph.getJobID(), "The given job is empty");
- }
-
- final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
+ final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
jobGraph.getSerializedExecutionConfig()
- .deserializeValue(userCodeLoader)
- .getRestartStrategy();
- if (restartStrategyConfiguration != null) {
- restartStrategy = RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration);
- }
- else {
- restartStrategy = restartStrategyFactory.createRestartStrategy();
- }
+ .deserializeValue(userCodeLoader)
+ .getRestartStrategy();
- log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobGraph.getName(), jobGraph.getJobID());
+ final RestartStrategy restartStrategy = (restartStrategyConfiguration != null) ?
+ RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) :
+ restartStrategyFactory.createRestartStrategy();
- if (jobManagerMetricGroup != null) {
- jobMetrics = jobManagerMetricGroup.addJob(jobGraph);
- }
- if (jobMetrics == null) {
- jobMetrics = new UnregisteredMetricsGroup();
- }
+ log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid);
- try {
- checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
- } catch (Exception e) {
- log.error("Could not get the checkpoint recovery factory.", e);
- throw new JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint recovery factory.", e);
- }
+ CheckpointRecoveryFactory checkpointRecoveryFactory;
+ try {
+ checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
+ } catch (Exception e) {
+ log.error("Could not create the access to highly-available checkpoint storage.", e);
+ throw new Exception("Could not create the access to highly-available checkpoint storage.", e);
+ }
- try {
- resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
- } catch (Exception e) {
- log.error("Could not get the resource manager leader retriever.", e);
- throw new JobSubmissionException(jobGraph.getJobID(),
+ try {
+ resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
+ } catch (Exception e) {
+ log.error("Could not get the resource manager leader retriever.", e);
+ throw new JobSubmissionException(jobGraph.getJobID(),
"Could not get the resource manager leader retriever.", e);
- }
- } catch (Throwable t) {
- log.error("Failed to initializing job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
-
- libraryCacheManager.unregisterJob(jobGraph.getJobID());
-
- if (t instanceof JobSubmissionException) {
- throw (JobSubmissionException) t;
- }
- else {
- throw new JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " +
- jobGraph.getName() + " (" + jobGraph.getJobID() + ")", t);
- }
}
+
+ this.executionGraph = ExecutionGraphBuilder.buildGraph(
+ null,
+ jobGraph,
+ configuration,
+ executorService,
+ executorService,
+ userCodeLoader,
+ checkpointRecoveryFactory,
+ rpcAskTimeout,
+ restartStrategy,
+ jobMetricGroup,
+ -1,
+ log);
+
+ // TODO - temp fix
+ this.scheduler = new Scheduler(executorService);
}
+ //----------------------------------------------------------------------------------------------
+ // Lifecycle management
+ //----------------------------------------------------------------------------------------------
+
+
@Override
public void start() {
- super.start();
+ throw new UnsupportedOperationException("Should never call start() without leader ID");
}
+ /**
+ * Start the rpc service and begin to run the job.
+ *
+ * @param leaderSessionID The necessary leader id for running the job.
+ */
+ public void start(final UUID leaderSessionID) {
+ if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
+ super.start();
+
+ log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID());
+ getSelf().startJobExecution();
+ } else {
+ log.warn("Job already started with leaderId {}, ignoring this start request.", leaderSessionID);
+ }
+ }
+
+ /**
+ * Suspend the job and shutdown all other services including rpc.
+ */
@Override
public void shutDown() {
+ // make sure there is a graceful exit
+ getSelf().suspendExecution(new Exception("JobManager is shutting down."));
super.shutDown();
-
- suspendJob(new Exception("JobManager is shutting down."));
-
- disposeCommunicationWithResourceManager();
}
-
-
//----------------------------------------------------------------------------------------------
// RPC methods
//----------------------------------------------------------------------------------------------
- /**
- * Start to run the job, runtime data structures like ExecutionGraph will be constructed now and checkpoint
- * being recovered. After this, we will begin to schedule the job.
- */
- @RpcMethod
- public void startJob(final UUID leaderSessionID) {
- log.info("Starting job {} ({}) with leaderId {}.", jobGraph.getName(), jobGraph.getJobID(), leaderSessionID);
+ //-- job starting and stopping -----------------------------------------------------------------
- this.leaderSessionID = leaderSessionID;
+ @RpcMethod
+ public void startJobExecution() {
+ log.info("Starting execution of job {} ({}) with leaderId {}.",
+ jobGraph.getName(), jobGraph.getJobID(), leaderSessionID);
try {
- if (executionGraph != null) {
- executionGraph = new ExecutionGraph(
- executionContext,
- executionContext,
- jobGraph.getJobID(),
- jobGraph.getName(),
- jobGraph.getJobConfiguration(),
- jobGraph.getSerializedExecutionConfig(),
- timeout,
- restartStrategy,
- jobGraph.getUserJarBlobKeys(),
- jobGraph.getClasspaths(),
- userCodeLoader,
- jobMetrics);
- } else {
- // TODO: update last active time in JobInfo
- }
-
- executionGraph.setScheduleMode(jobGraph.getScheduleMode());
- executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
-
- try {
- executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
- } catch (Exception e) {
- log.warn("Cannot create JSON plan for job {} ({})", jobGraph.getJobID(), jobGraph.getName(), e);
- executionGraph.setJsonPlan("{}");
- }
-
- // initialize the vertices that have a master initialization hook
- // file output formats create directories here, input formats create splits
- if (log.isDebugEnabled()) {
- log.debug("Running initialization on master for job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
- }
- for (JobVertex vertex : jobGraph.getVertices()) {
- final String executableClass = vertex.getInvokableClassName();
- if (executableClass == null || executableClass.length() == 0) {
- throw new JobExecutionException(jobGraph.getJobID(),
- "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
- }
- if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
- vertex.setParallelism(scheduler.getTotalNumberOfSlots());
- }
-
- try {
- vertex.initializeOnMaster(userCodeLoader);
- } catch (Throwable t) {
- throw new JobExecutionException(jobGraph.getJobID(),
- "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
- }
- }
-
- // topologically sort the job vertices and attach the graph to the existing one
- final List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
- if (log.isDebugEnabled()) {
- log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(),
- jobGraph.getJobID(), jobGraph.getName());
- }
- executionGraph.attachJobGraph(sortedTopology);
-
- if (log.isDebugEnabled()) {
- log.debug("Successfully created execution graph from job graph {} ({}).",
- jobGraph.getJobID(), jobGraph.getName());
- }
-
- final JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings();
- if (snapshotSettings != null) {
- List<ExecutionJobVertex> triggerVertices = getExecutionJobVertexWithId(
- executionGraph, snapshotSettings.getVerticesToTrigger());
-
- List<ExecutionJobVertex> ackVertices = getExecutionJobVertexWithId(
- executionGraph, snapshotSettings.getVerticesToAcknowledge());
-
- List<ExecutionJobVertex> confirmVertices = getExecutionJobVertexWithId(
- executionGraph, snapshotSettings.getVerticesToConfirm());
-
- CompletedCheckpointStore completedCheckpoints = checkpointRecoveryFactory.createCheckpointStore(
- jobGraph.getJobID(), userCodeLoader);
-
- CheckpointIDCounter checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(
- jobGraph.getJobID());
-
- // Checkpoint stats tracker
- boolean isStatsDisabled = configuration.getBoolean(
- ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
- ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
-
- final CheckpointStatsTracker checkpointStatsTracker;
- if (isStatsDisabled) {
- checkpointStatsTracker = new DisabledCheckpointStatsTracker();
- }
- else {
- int historySize = configuration.getInteger(
- ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
- ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
- checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics);
- }
-
- String externalizedCheckpointsDir = configuration.getString(
- ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null);
-
- executionGraph.enableSnapshotCheckpointing(
- snapshotSettings.getCheckpointInterval(),
- snapshotSettings.getCheckpointTimeout(),
- snapshotSettings.getMinPauseBetweenCheckpoints(),
- snapshotSettings.getMaxConcurrentCheckpoints(),
- snapshotSettings.getExternalizedCheckpointSettings(),
- triggerVertices,
- ackVertices,
- confirmVertices,
- checkpointIdCounter,
- completedCheckpoints,
- externalizedCheckpointsDir,
- checkpointStatsTracker);
- }
-
- // TODO: register this class to execution graph as job status change listeners
-
- // TODO: register client as job / execution status change listeners if they are interested
-
- /*
- TODO: decide whether we should take the savepoint before recovery
-
- if (isRecovery) {
- // this is a recovery of a master failure (this master takes over)
- executionGraph.restoreLatestCheckpointedState();
- } else {
- if (snapshotSettings != null) {
- String savepointPath = snapshotSettings.getSavepointPath();
- if (savepointPath != null) {
- // got a savepoint
- log.info("Starting job from savepoint {}.", savepointPath);
-
- // load the savepoint as a checkpoint into the system
- final CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
- jobGraph.getJobID(), executionGraph.getAllVertices(), savepointStore, savepointPath);
- executionGraph.getCheckpointCoordinator().getCheckpointStore().addCheckpoint(savepoint);
-
- // Reset the checkpoint ID counter
- long nextCheckpointId = savepoint.getCheckpointID() + 1;
- log.info("Reset the checkpoint ID to " + nextCheckpointId);
- executionGraph.getCheckpointCoordinator().getCheckpointIdCounter().setCount(nextCheckpointId);
-
- executionGraph.restoreLatestCheckpointedState();
- }
+ // register self as job status change listener
+ executionGraph.registerJobStatusListener(new JobStatusListener() {
+ @Override
+ public void jobStatusChanges(
+ final JobID jobId, final JobStatus newJobStatus, final long timestamp, final Throwable error)
+ {
+ // run in rpc thread to avoid concurrency
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ jobStatusChanged(newJobStatus, timestamp, error);
+ }
+ });
}
- }
- */
+ });
- // job is good to go, try to locate resource manager's address
+ // job is ready to go, try to establish connection with resource manager
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
} catch (Throwable t) {
+
+ // TODO - this should not result in a job failure, but another leader should take over
+ // TODO - either this master should retry the execution, or it should relinquish leadership / terminate
+
log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
executionGraph.fail(t);
- executionGraph = null;
- final Throwable rt;
+ final JobExecutionException rt;
if (t instanceof JobExecutionException) {
rt = (JobExecutionException) t;
- }
- else {
+ } else {
rt = new JobExecutionException(jobGraph.getJobID(),
- "Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
+ "Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
}
// TODO: notify client about this failure
@@ -490,34 +336,51 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
executionContext.execute(new Runnable() {
@Override
public void run() {
- if (executionGraph != null) {
- try {
- executionGraph.scheduleForExecution(scheduler);
- } catch (Throwable t) {
- executionGraph.fail(t);
- }
+ try {
+ executionGraph.scheduleForExecution(scheduler);
+ } catch (Throwable t) {
+ executionGraph.fail(t);
}
}
});
}
/**
- * Suspending job, all the running tasks will be cancelled, and runtime data will be cleared.
+ * Suspending job, all the running tasks will be cancelled, and communication with other components
+ * will be disposed.
+ *
+ * <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by
+ * calling the {@link #start(UUID)} method once we take the leadership back again.
*
* @param cause The reason of why this job been suspended.
*/
@RpcMethod
- public void suspendJob(final Throwable cause) {
+ public void suspendExecution(final Throwable cause) {
+ if (leaderSessionID == null) {
+ log.debug("Job has already been suspended or shutdown.");
+ return;
+ }
+
+ // receive no more messages until started again, should be called before we clear self leader id
+ ((StartStoppable) getSelf()).stop();
+
leaderSessionID = null;
+ executionGraph.suspend(cause);
- if (executionGraph != null) {
- executionGraph.suspend(cause);
- executionGraph = null;
+ // disconnect from resource manager:
+ try {
+ resourceManagerLeaderRetriever.stop();
+ } catch (Exception e) {
+ log.warn("Failed to stop resource manager leader retriever when suspending.");
}
+ closeResourceManagerConnection();
+
+ // TODO: disconnect from all registered task managers
- disposeCommunicationWithResourceManager();
}
+ //----------------------------------------------------------------------------------------------
+
/**
* Updates the task execution state for a given task.
*
@@ -525,26 +388,38 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
* @return Acknowledge the task execution state update
*/
@RpcMethod
- public Acknowledge updateTaskExecutionState(final TaskExecutionState taskExecutionState) throws ExecutionGraphException {
+ public Acknowledge updateTaskExecutionState(
+ final UUID leaderSessionID,
+ final TaskExecutionState taskExecutionState) throws Exception
+ {
if (taskExecutionState == null) {
throw new NullPointerException("TaskExecutionState must not be null.");
}
+ if (!this.leaderSessionID.equals(leaderSessionID)) {
+ throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+ + ", actual: " + leaderSessionID);
+ }
+
if (executionGraph.updateState(taskExecutionState)) {
return Acknowledge.get();
} else {
throw new ExecutionGraphException("The execution attempt " +
- taskExecutionState.getID() + " was not found.");
+ taskExecutionState.getID() + " was not found.");
}
-
}
-
@RpcMethod
public SerializedInputSplit requestNextInputSplit(
- final JobVertexID vertexID,
- final ExecutionAttemptID executionAttempt) throws Exception
+ final UUID leaderSessionID,
+ final JobVertexID vertexID,
+ final ExecutionAttemptID executionAttempt) throws Exception
{
+ if (!this.leaderSessionID.equals(leaderSessionID)) {
+ throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+ + ", actual: " + leaderSessionID);
+ }
+
final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
if (execution == null) {
// can happen when JobManager had already unregistered this execution upon on task failure,
@@ -583,7 +458,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
} catch (Exception ex) {
log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
IOException reason = new IOException("Could not serialize the next input split of class " +
- nextInputSplit.getClass() + ".", ex);
+ nextInputSplit.getClass() + ".", ex);
vertex.fail(reason);
throw reason;
}
@@ -591,16 +466,21 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
@RpcMethod
public ExecutionState requestPartitionState(
- final JobID ignored,
- final IntermediateDataSetID intermediateResultId,
- final ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
+ final UUID leaderSessionID,
+ final IntermediateDataSetID intermediateResultId,
+ final ResultPartitionID resultPartitionId) throws Exception {
+
+ if (!this.leaderSessionID.equals(leaderSessionID)) {
+ throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+ + ", actual: " + leaderSessionID);
+ }
final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
if (execution != null) {
return execution.getState();
}
else {
- final IntermediateResult intermediateResult =
+ final IntermediateResult intermediateResult =
executionGraph.getAllIntermediateResults().get(intermediateResultId);
if (intermediateResult != null) {
@@ -623,7 +503,15 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
@RpcMethod
- public Acknowledge scheduleOrUpdateConsumers(ResultPartitionID partitionID) throws ExecutionGraphException {
+ public Acknowledge scheduleOrUpdateConsumers(
+ final UUID leaderSessionID,
+ final ResultPartitionID partitionID) throws Exception
+ {
+ if (!this.leaderSessionID.equals(leaderSessionID)) {
+ throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+ + ", actual: " + leaderSessionID);
+ }
+
executionGraph.scheduleOrUpdateConsumers(partitionID);
return Acknowledge.get();
}
@@ -638,171 +526,118 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final CheckpointMetaData checkpointInfo,
- final SubtaskState checkpointStateHandles) {
+ final SubtaskState checkpointState) throws CheckpointException {
- throw new UnsupportedOperationException();
+ final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+ final AcknowledgeCheckpoint ackMessage =
+ new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointInfo, checkpointState);
+
+ if (checkpointCoordinator != null) {
+ getRpcService().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
+ } catch (Throwable t) {
+ log.warn("Error while processing checkpoint acknowledgement message");
+ }
+ }
+ });
+ } else {
+ log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator",
+ jobGraph.getJobID());
+ }
}
@RpcMethod
public void declineCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
- final long checkpointId,
- final Throwable cause) {
-
- throw new UnsupportedOperationException();
- }
-
- //----------------------------------------------------------------------------------------------
- // Internal methods
- //----------------------------------------------------------------------------------------------
-
- @RpcMethod
- public void resourceRemoved(final ResourceID resourceId, final String message) {
- // TODO: remove resource from slot pool
- }
+ final long checkpointID,
+ final Throwable reason)
+ {
+ final DeclineCheckpoint decline = new DeclineCheckpoint(
+ jobID, executionAttemptID, checkpointID, reason);
+ final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
- @RpcMethod
- public void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge) {
- if (executionGraph != null) {
- final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
- if (checkpointCoordinator != null) {
- getRpcService().execute(new Runnable() {
- @Override
- public void run() {
- try {
- if (!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) {
- log.info("Received message for non-existing checkpoint {}.",
- acknowledge.getCheckpointId());
- }
- } catch (Exception e) {
- log.error("Error in CheckpointCoordinator while processing {}", acknowledge, e);
- }
+ if (checkpointCoordinator != null) {
+ getRpcService().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ checkpointCoordinator.receiveDeclineMessage(decline);
+ } catch (Exception e) {
+ log.error("Error in CheckpointCoordinator while processing {}", decline, e);
}
- });
- }
- else {
- log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator",
- jobGraph.getJobID());
- }
+ }
+ });
} else {
- log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID());
- }
- }
-
- @RpcMethod
- public void declineCheckpoint(final DeclineCheckpoint decline) {
- if (executionGraph != null) {
- final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
- if (checkpointCoordinator != null) {
- getRpcService().execute(new Runnable() {
- @Override
- public void run() {
- try {
- log.info("Received message for non-existing checkpoint {}.", decline.getCheckpointId());
- } catch (Exception e) {
- log.error("Error in CheckpointCoordinator while processing {}", decline, e);
- }
- }
- });
- } else {
- log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator",
+ log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator",
jobGraph.getJobID());
- }
- } else {
- log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID());
}
}
@RpcMethod
public KvStateLocation lookupKvStateLocation(final String registrationName) throws Exception {
- if (executionGraph != null) {
- if (log.isDebugEnabled()) {
- log.debug("Lookup key-value state for job {} with registration " +
+ if (log.isDebugEnabled()) {
+ log.debug("Lookup key-value state for job {} with registration " +
"name {}.", jobGraph.getJobID(), registrationName);
- }
+ }
- final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
- final KvStateLocation location = registry.getKvStateLocation(registrationName);
- if (location != null) {
- return location;
- } else {
- throw new UnknownKvStateLocation(registrationName);
- }
+ final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
+ final KvStateLocation location = registry.getKvStateLocation(registrationName);
+ if (location != null) {
+ return location;
} else {
- throw new IllegalStateException("Received lookup KvState location request for unavailable job " +
- jobGraph.getJobID());
+ throw new UnknownKvStateLocation(registrationName);
}
}
@RpcMethod
public void notifyKvStateRegistered(
- final JobVertexID jobVertexId,
- final KeyGroupRange keyGroupRange,
- final String registrationName,
- final KvStateID kvStateId,
- final KvStateServerAddress kvStateServerAddress)
+ final JobVertexID jobVertexId,
+ final KeyGroupRange keyGroupRange,
+ final String registrationName,
+ final KvStateID kvStateId,
+ final KvStateServerAddress kvStateServerAddress)
{
- if (executionGraph != null) {
- if (log.isDebugEnabled()) {
- log.debug("Key value state registered for job {} under name {}.",
+ if (log.isDebugEnabled()) {
+ log.debug("Key value state registered for job {} under name {}.",
jobGraph.getJobID(), registrationName);
- }
- try {
- executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
- jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress
- );
- } catch (Exception e) {
- log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
- }
- } else {
- log.error("Received notify KvState registered request for unavailable job " + jobGraph.getJobID());
+ }
+
+ try {
+ executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
+ jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
+ } catch (Exception e) {
+ log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
}
}
@RpcMethod
public void notifyKvStateUnregistered(
- JobVertexID jobVertexId,
- KeyGroupRange keyGroupRange,
- String registrationName)
+ JobVertexID jobVertexId,
+ KeyGroupRange keyGroupRange,
+ String registrationName)
{
- if (executionGraph != null) {
- if (log.isDebugEnabled()) {
- log.debug("Key value state unregistered for job {} under name {}.",
+ if (log.isDebugEnabled()) {
+ log.debug("Key value state unregistered for job {} under name {}.",
jobGraph.getJobID(), registrationName);
- }
- try {
- executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
- jobVertexId, keyGroupRange, registrationName
- );
- } catch (Exception e) {
- log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
- }
- } else {
- log.error("Received notify KvState unregistered request for unavailable job " + jobGraph.getJobID());
}
- }
- @RpcMethod
- public Future<TriggerSavepointResponse> triggerSavepoint() throws Exception {
- return null;
- }
-
- @RpcMethod
- public DisposeSavepointResponse disposeSavepoint(final String savepointPath) {
- // TODO
- return null;
+ try {
+ executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
+ jobVertexId, keyGroupRange, registrationName);
+ } catch (Exception e) {
+ log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
+ }
}
@RpcMethod
public ClassloadingProps requestClassloadingProps() throws Exception {
- if (executionGraph != null) {
- return new ClassloadingProps(libraryCacheManager.getBlobServerPort(),
+ return new ClassloadingProps(libraryCacheManager.getBlobServerPort(),
executionGraph.getRequiredJarFiles(),
executionGraph.getRequiredClasspaths());
- } else {
- throw new Exception("Received classloading props request for unavailable job " + jobGraph.getJobID());
- }
}
//----------------------------------------------------------------------------------------------
@@ -815,12 +650,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
public void run() {
log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
shutDown();
- jobCompletionActions.onFatalError(cause);
+ errorHandler.onFatalError(cause);
}
});
}
- // TODO - wrap this as StatusListenerMessenger's callback with rpc main thread
private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
final JobID jobID = executionGraph.getJobID();
final String jobName = executionGraph.getJobName();
@@ -848,36 +682,33 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
if (newJobStatus == JobStatus.FINISHED) {
try {
final Map<String, SerializedValue<Object>> accumulatorResults =
- executionGraph.getAccumulatorsSerialized();
+ executionGraph.getAccumulatorsSerialized();
final SerializedJobExecutionResult result = new SerializedJobExecutionResult(
- jobID, 0, accumulatorResults // TODO get correct job duration
+ jobID, 0, accumulatorResults // TODO get correct job duration
);
jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
} catch (Exception e) {
log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
final JobExecutionException exception = new JobExecutionException(
- jobID, "Failed to retrieve accumulator results.", e);
+ jobID, "Failed to retrieve accumulator results.", e);
// TODO should we also notify client?
jobCompletionActions.jobFailed(exception);
}
- }
- else if (newJobStatus == JobStatus.CANCELED) {
+ } else if (newJobStatus == JobStatus.CANCELED) {
final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
final JobExecutionException exception = new JobExecutionException(
- jobID, "Job was cancelled.", unpackedError);
+ jobID, "Job was cancelled.", unpackedError);
// TODO should we also notify client?
jobCompletionActions.jobFailed(exception);
- }
- else if (newJobStatus == JobStatus.FAILED) {
+ } else if (newJobStatus == JobStatus.FAILED) {
final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
final JobExecutionException exception = new JobExecutionException(
- jobID, "Job execution failed.", unpackedError);
+ jobID, "Job execution failed.", unpackedError);
// TODO should we also notify client?
jobCompletionActions.jobFailed(exception);
- }
- else {
+ } else {
final JobExecutionException exception = new JobExecutionException(
- jobID, newJobStatus + " is not a terminal state.");
+ jobID, newJobStatus + " is not a terminal state.");
// TODO should we also notify client?
jobCompletionActions.jobFailed(exception);
throw new RuntimeException(exception);
@@ -886,7 +717,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
private void notifyOfNewResourceManagerLeader(
- final String resourceManagerAddress, final UUID resourceManagerLeaderId)
+ final String resourceManagerAddress, final UUID resourceManagerLeaderId)
{
// IMPORTANT: executed by main thread to avoid concurrence
runAsync(new Runnable() {
@@ -895,17 +726,15 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
if (resourceManagerConnection != null) {
if (resourceManagerAddress != null) {
if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
- && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
- {
+ && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
// both address and leader id are not changed, we can keep the old connection
return;
}
log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
- resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
- }
- else {
+ resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+ } else {
log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
- resourceManagerConnection.getTargetAddress());
+ resourceManagerConnection.getTargetAddress());
}
}
@@ -914,8 +743,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
if (resourceManagerAddress != null) {
log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
resourceManagerConnection = new ResourceManagerConnection(
- log, jobGraph.getJobID(), leaderSessionID,
- resourceManagerAddress, resourceManagerLeaderId, executionContext);
+ log, jobGraph.getJobID(), leaderSessionID,
+ resourceManagerAddress, resourceManagerLeaderId, executionContext);
resourceManagerConnection.start();
}
}
@@ -929,26 +758,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
// TODO - add tests for comment in https://github.com/apache/flink/pull/2565
// verify the response with current connection
if (resourceManagerConnection != null
- && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) {
+ && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) {
log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
- success.getResourceManagerLeaderId());
+ success.getResourceManagerLeaderId());
}
}
});
}
- private void disposeCommunicationWithResourceManager() {
- // 1. stop the leader retriever so we will not receiving updates anymore
- try {
- resourceManagerLeaderRetriever.stop();
- } catch (Exception e) {
- log.warn("Failed to stop resource manager leader retriever.");
- }
-
- // 2. close current connection with ResourceManager if exists
- closeResourceManagerConnection();
- }
-
private void closeResourceManagerConnection() {
if (resourceManagerConnection != null) {
resourceManagerConnection.close();
@@ -957,34 +774,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
//----------------------------------------------------------------------------------------------
- // Helper methods
- //----------------------------------------------------------------------------------------------
-
- /**
- * Converts JobVertexIDs to corresponding ExecutionJobVertexes
- *
- * @param executionGraph The execution graph that holds the relationship
- * @param vertexIDs The vertexIDs need to be converted
- * @return The corresponding ExecutionJobVertexes
- * @throws JobExecutionException
- */
- private static List<ExecutionJobVertex> getExecutionJobVertexWithId(
- final ExecutionGraph executionGraph, final List<JobVertexID> vertexIDs)
- throws JobExecutionException
- {
- final List<ExecutionJobVertex> ret = new ArrayList<>(vertexIDs.size());
- for (JobVertexID vertexID : vertexIDs) {
- final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(vertexID);
- if (executionJobVertex == null) {
- throw new JobExecutionException(executionGraph.getJobID(),
- "The snapshot checkpointing settings refer to non-existent vertex " + vertexID);
- }
- ret.add(executionJobVertex);
- }
- return ret;
- }
-
- //----------------------------------------------------------------------------------------------
// Utility classes
//----------------------------------------------------------------------------------------------
@@ -1001,19 +790,19 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
private class ResourceManagerConnection
- extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess>
+ extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess>
{
private final JobID jobID;
private final UUID jobManagerLeaderID;
ResourceManagerConnection(
- final Logger log,
- final JobID jobID,
- final UUID jobManagerLeaderID,
- final String resourceManagerAddress,
- final UUID resourceManagerLeaderID,
- final Executor executor)
+ final Logger log,
+ final JobID jobID,
+ final UUID jobManagerLeaderID,
+ final String resourceManagerAddress,
+ final UUID resourceManagerLeaderID,
+ final Executor executor)
{
super(log, resourceManagerAddress, resourceManagerLeaderID, executor);
this.jobID = checkNotNull(jobID);
@@ -1023,12 +812,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
@Override
protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
return new RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess>(
- log, getRpcService(), "ResourceManager", ResourceManagerGateway.class,
- getTargetAddress(), getTargetLeaderId())
+ log, getRpcService(), "ResourceManager", ResourceManagerGateway.class,
+ getTargetAddress(), getTargetLeaderId())
{
@Override
protected Future<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, UUID leaderId,
- long timeoutMillis) throws Exception
+ long timeoutMillis) throws Exception
{
Time timeout = Time.milliseconds(timeoutMillis);
return gateway.registerJobMaster(leaderId, jobManagerLeaderID, getAddress(), jobID, timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 5223b3e..daa33a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.jobmaster;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -32,11 +31,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
-import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
-import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateServerAddress;
@@ -52,52 +47,54 @@ import java.util.UUID;
*/
public interface JobMasterGateway extends CheckpointCoordinatorGateway {
- /**
- * Starting the job under the given leader session ID.
- */
- void startJob(final UUID leaderSessionID);
+ // ------------------------------------------------------------------------
+ // Job start and stop methods
+ // ------------------------------------------------------------------------
- /**
- * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared.
- * Should re-submit the job before restarting it.
- *
- * @param cause The reason of why this job been suspended.
- */
- void suspendJob(final Throwable cause);
+ void startJobExecution();
+
+ void suspendExecution(Throwable cause);
+
+ // ------------------------------------------------------------------------
/**
* Updates the task execution state for a given task.
*
+ * @param leaderSessionID The leader id of JobManager
* @param taskExecutionState New task execution state for a given task
* @return Future flag of the task execution state update result
*/
- Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
+ Future<Acknowledge> updateTaskExecutionState(
+ final UUID leaderSessionID,
+ final TaskExecutionState taskExecutionState);
/**
* Requesting next input split for the {@link ExecutionJobVertex}. The next input split is sent back to the sender
* as a {@link SerializedInputSplit} message.
*
+ * @param leaderSessionID The leader id of JobManager
* @param vertexID The job vertex id
* @param executionAttempt The execution attempt id
* @return The future of the input split. If there is no further input split, will return an empty object.
*/
Future<SerializedInputSplit> requestNextInputSplit(
- final JobVertexID vertexID,
- final ExecutionAttemptID executionAttempt);
+ final UUID leaderSessionID,
+ final JobVertexID vertexID,
+ final ExecutionAttemptID executionAttempt);
/**
- * Requests the current state of the producer of an intermediate result partition.
+ * Requests the current state of the partition.
* The state of a partition is currently bound to the state of the producing execution.
*
- * @param jobId TheID of job that the intermediate result partition belongs to.
+ * @param leaderSessionID The leader id of JobManager
* @param intermediateResultId The execution attempt ID of the task requesting the partition state.
* @param partitionId The partition ID of the partition to request the state of.
* @return The future of the partition state
*/
Future<ExecutionState> requestPartitionState(
- JobID jobId,
- IntermediateDataSetID intermediateResultId,
- ResultPartitionID partitionId);
+ final UUID leaderSessionID,
+ final IntermediateDataSetID intermediateResultId,
+ final ResultPartitionID partitionId);
/**
* Notifies the JobManager about available data for a produced partition.
@@ -108,11 +105,15 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
* <p>
* The JobManager then can decide when to schedule the partition consumers of the given session.
*
- * @param partitionID The partition which has already produced data
- * @param timeout before the rpc call fails
+ * @param leaderSessionID The leader id of JobManager
+ * @param partitionID The partition which has already produced data
+ * @param timeout before the rpc call fails
* @return Future acknowledge of the schedule or update operation
*/
- Future<Acknowledge> scheduleOrUpdateConsumers(final ResultPartitionID partitionID, @RpcTimeout final Time timeout);
+ Future<Acknowledge> scheduleOrUpdateConsumers(
+ final UUID leaderSessionID,
+ final ResultPartitionID partitionID,
+ @RpcTimeout final Time timeout);
/**
* Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
@@ -123,36 +124,12 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
void disconnectTaskManager(ResourceID resourceID);
/**
- * Notifies the JobManager about the removal of a resource.
- *
- * @param resourceId The ID under which the resource is registered.
- * @param message Optional message with details, for logging and debugging.
- */
-
- void resourceRemoved(final ResourceID resourceId, final String message);
-
- /**
- * Notifies the JobManager that the checkpoint of an individual task is completed.
- *
- * @param acknowledge The acknowledge message of the checkpoint
- */
- void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge);
-
- /**
- * Notifies the JobManager that a checkpoint request could not be heeded.
- * This can happen if a Task is already in RUNNING state but is internally not yet ready to perform checkpoints.
- *
- * @param decline The decline message of the checkpoint
- */
- void declineCheckpoint(final DeclineCheckpoint decline);
-
- /**
* Requests a {@link KvStateLocation} for the specified {@link KvState} registration name.
*
* @param registrationName Name under which the KvState has been registered.
* @return Future of the requested {@link KvState} location
*/
- Future<KvStateLocation> lookupKvStateLocation(final String registrationName) throws Exception;
+ Future<KvStateLocation> lookupKvStateLocation(final String registrationName);
/**
* @param jobVertexId JobVertexID the KvState instance belongs to.
@@ -162,11 +139,11 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
* @param kvStateServerAddress Server address where to find the KvState instance.
*/
void notifyKvStateRegistered(
- final JobVertexID jobVertexId,
- final KeyGroupRange keyGroupRange,
- final String registrationName,
- final KvStateID kvStateId,
- final KvStateServerAddress kvStateServerAddress);
+ final JobVertexID jobVertexId,
+ final KeyGroupRange keyGroupRange,
+ final String registrationName,
+ final KvStateID kvStateId,
+ final KvStateServerAddress kvStateServerAddress);
/**
* @param jobVertexId JobVertexID the KvState instance belongs to.
@@ -174,24 +151,9 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
* @param registrationName Name under which the KvState has been registered.
*/
void notifyKvStateUnregistered(
- JobVertexID jobVertexId,
- KeyGroupRange keyGroupRange,
- String registrationName);
-
- /**
- * Notifies the JobManager to trigger a savepoint for this job.
- *
- * @return Future of the savepoint trigger response.
- */
- Future<TriggerSavepointResponse> triggerSavepoint();
-
- /**
- * Notifies the Jobmanager to dispose specified savepoint.
- *
- * @param savepointPath The path of the savepoint.
- * @return The future of the savepoint disponse response.
- */
- Future<DisposeSavepointResponse> disposeSavepoint(final String savepointPath);
+ JobVertexID jobVertexId,
+ KeyGroupRange keyGroupRange,
+ String registrationName);
/**
* Request the classloading props of this job.
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
index e8fb5bb..019ccfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.slf4j.Logger;
@@ -62,6 +64,9 @@ public class MiniClusterJobDispatcher {
/** al the services that the JobManager needs, such as BLOB service, factories, etc */
private final JobManagerServices jobManagerServices;
+ /** Registry for all metrics in the mini cluster */
+ private final MetricRegistry metricRegistry;
+
/** The number of JobManagers to launch (more than one simulates a high-availability setup) */
private final int numJobManagers;
@@ -86,8 +91,9 @@ public class MiniClusterJobDispatcher {
public MiniClusterJobDispatcher(
Configuration config,
RpcService rpcService,
- HighAvailabilityServices haServices) throws Exception {
- this(config, rpcService, haServices, 1);
+ HighAvailabilityServices haServices,
+ MetricRegistry metricRegistry) throws Exception {
+ this(config, rpcService, haServices, metricRegistry, 1);
}
/**
@@ -106,16 +112,18 @@ public class MiniClusterJobDispatcher {
Configuration config,
RpcService rpcService,
HighAvailabilityServices haServices,
+ MetricRegistry metricRegistry,
int numJobManagers) throws Exception {
checkArgument(numJobManagers >= 1);
this.configuration = checkNotNull(config);
this.rpcService = checkNotNull(rpcService);
this.haServices = checkNotNull(haServices);
+ this.metricRegistry = checkNotNull(metricRegistry);
this.numJobManagers = numJobManagers;
LOG.info("Creating JobMaster services");
- this.jobManagerServices = JobManagerServices.fromConfiguration(config);
+ this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
}
// ------------------------------------------------------------------------
@@ -140,9 +148,8 @@ public class MiniClusterJobDispatcher {
if (runners != null) {
this.runners = null;
- Exception shutdownException = new Exception("The MiniCluster is shutting down");
for (JobManagerRunner runner : runners) {
- runner.shutdown(shutdownException);
+ runner.shutdown();
}
}
}
@@ -171,9 +178,9 @@ public class MiniClusterJobDispatcher {
checkState(!shutdown, "mini cluster is shut down");
checkState(runners == null, "mini cluster can only execute one job at a time");
- OnCompletionActions onJobCompletion = new DetachedFinalizer(numJobManagers);
+ DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers);
- this.runners = startJobRunners(job, onJobCompletion);
+ this.runners = startJobRunners(job, finalizer, finalizer);
}
}
@@ -191,17 +198,17 @@ public class MiniClusterJobDispatcher {
checkNotNull(job);
LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
- final BlockingJobSync onJobCompletion = new BlockingJobSync(job.getJobID(), numJobManagers);
+ final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
synchronized (lock) {
checkState(!shutdown, "mini cluster is shut down");
checkState(runners == null, "mini cluster can only execute one job at a time");
- this.runners = startJobRunners(job, onJobCompletion);
+ this.runners = startJobRunners(job, sync, sync);
}
try {
- return onJobCompletion.getResult();
+ return sync.getResult();
}
finally {
// always clear the status for the next job
@@ -209,24 +216,26 @@ public class MiniClusterJobDispatcher {
}
}
- private JobManagerRunner[] startJobRunners(JobGraph job, OnCompletionActions onCompletion) throws JobExecutionException {
+ private JobManagerRunner[] startJobRunners(
+ JobGraph job,
+ OnCompletionActions onCompletion,
+ FatalErrorHandler errorHandler) throws JobExecutionException {
LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
for (int i = 0; i < numJobManagers; i++) {
try {
runners[i] = new JobManagerRunner(job, configuration,
- rpcService, haServices, jobManagerServices, onCompletion);
+ rpcService, haServices, jobManagerServices, metricRegistry,
+ onCompletion, errorHandler);
runners[i].start();
}
catch (Throwable t) {
// shut down all the ones so far
- Exception shutdownCause = new Exception("Failed to properly start all mini cluster JobManagers", t);
-
for (int k = 0; k <= i; k++) {
try {
if (runners[i] != null) {
- runners[i].shutdown(shutdownCause);
+ runners[i].shutdown();
}
} catch (Throwable ignored) {
// silent shutdown
@@ -244,15 +253,15 @@ public class MiniClusterJobDispatcher {
// test methods to simulate job master failures
// ------------------------------------------------------------------------
- public void killJobMaster(int which) {
- checkArgument(which >= 0 && which < numJobManagers, "no such job master");
- checkState(!shutdown, "mini cluster is shut down");
-
- JobManagerRunner[] runners = this.runners;
- checkState(runners != null, "mini cluster it not executing a job right now");
-
- runners[which].shutdown(new Throwable("kill JobManager"));
- }
+// public void killJobMaster(int which) {
+// checkArgument(which >= 0 && which < numJobManagers, "no such job master");
+// checkState(!shutdown, "mini cluster is shut down");
+//
+// JobManagerRunner[] runners = this.runners;
+// checkState(runners != null, "mini cluster it not executing a job right now");
+//
+// runners[which].shutdown(new Throwable("kill JobManager"));
+// }
// ------------------------------------------------------------------------
// utility classes
@@ -263,7 +272,7 @@ public class MiniClusterJobDispatcher {
* In the case of a high-availability test setup, there may be multiple runners.
* After that, it marks the mini cluster as ready to receive new jobs.
*/
- private class DetachedFinalizer implements OnCompletionActions {
+ private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
private final AtomicInteger numJobManagersToWaitFor;
@@ -308,7 +317,7 @@ public class MiniClusterJobDispatcher {
* That way it is guaranteed that after the blocking job submit call returns,
* the dispatcher is immediately free to accept another job.
*/
- private static class BlockingJobSync implements OnCompletionActions {
+ private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler {
private final JobID jobId;
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
index 520755d..572ba2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.blob.BlobKey;
import java.io.Serializable;
import java.net.URL;
import java.util.Collection;
-import java.util.List;
/**
* The response of classloading props request to JobManager.
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
deleted file mode 100644
index 42bfc71..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.message;
-
-import java.io.Serializable;
-
-/**
- * The response of the dispose savepoint request to JobManager.
- */
-public abstract class DisposeSavepointResponse implements Serializable {
-
- private static final long serialVersionUID = 6008792963949369567L;
-
- public static class Success extends DisposeSavepointResponse implements Serializable {
-
- private static final long serialVersionUID = 1572462960008711415L;
- }
-
- public static class Failure extends DisposeSavepointResponse implements Serializable {
-
- private static final long serialVersionUID = -7505308325483022458L;
-
- private final Throwable cause;
-
- public Failure(final Throwable cause) {
- this.cause = cause;
- }
-
- public Throwable getCause() {
- return cause;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
deleted file mode 100644
index 0b0edc5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.message;
-
-import org.apache.flink.api.common.JobID;
-
-import java.io.Serializable;
-
-/**
- * The response of the trigger savepoint request to JobManager.
- */
-public abstract class TriggerSavepointResponse implements Serializable {
-
- private static final long serialVersionUID = 3139327824611807707L;
-
- private final JobID jobID;
-
- public JobID getJobID() {
- return jobID;
- }
-
- public TriggerSavepointResponse(final JobID jobID) {
- this.jobID = jobID;
- }
-
- public static class Success extends TriggerSavepointResponse implements Serializable {
-
- private static final long serialVersionUID = -1100637460388881776L;
-
- private final String savepointPath;
-
- public Success(final JobID jobID, final String savepointPath) {
- super(jobID);
- this.savepointPath = savepointPath;
- }
-
- public String getSavepointPath() {
- return savepointPath;
- }
- }
-
- public static class Failure extends TriggerSavepointResponse implements Serializable {
-
- private static final long serialVersionUID = -1668479003490615139L;
-
- private final Throwable cause;
-
- public Failure(final JobID jobID, final Throwable cause) {
- super(jobID);
- this.cause = cause;
- }
-
- public Throwable getCause() {
- return cause;
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 2052f98..4b9100a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -33,8 +33,8 @@ import java.util.concurrent.TimeUnit;
public interface RpcService {
/**
- * Return the address under which the rpc service can be reached. If the rpc service cannot be
- * contacted remotely, then it will return an empty string.
+ * Return the hostname or host address under which the rpc service can be reached.
+ * If the rpc service cannot be contacted remotely, then it will return an empty string.
*
* @return Address of the rpc service or empty string if local rpc service
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
index 72668d2..1b311e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -26,11 +26,16 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.util.Preconditions;
+import java.util.UUID;
+
/**
* Container class for JobManager specific communication utils used by the {@link TaskExecutor}.
*/
public class JobManagerConnection {
+ // Job master leader session id
+ private final UUID jobMasterLeaderId;
+
// Gateway to the job master
private final JobMasterGateway jobMasterGateway;
@@ -50,13 +55,15 @@ public class JobManagerConnection {
private final PartitionProducerStateChecker partitionStateChecker;
public JobManagerConnection(
- JobMasterGateway jobMasterGateway,
- TaskManagerActions taskManagerActions,
- CheckpointResponder checkpointResponder,
- LibraryCacheManager libraryCacheManager,
- ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
- PartitionProducerStateChecker partitionStateChecker) {
-
+ UUID jobMasterLeaderId,
+ JobMasterGateway jobMasterGateway,
+ TaskManagerActions taskManagerActions,
+ CheckpointResponder checkpointResponder,
+ LibraryCacheManager libraryCacheManager,
+ ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
+ PartitionProducerStateChecker partitionStateChecker)
+ {
+ this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId);
this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions);
this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
@@ -65,6 +72,10 @@ public class JobManagerConnection {
this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
}
+ public UUID getJobMasterLeaderId() {
+ return jobMasterLeaderId;
+ }
+
public JobMasterGateway getJobManagerGateway() {
return jobMasterGateway;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 36f108e..2389291 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -18,37 +18,46 @@
package org.apache.flink.runtime.taskexecutor;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
@@ -62,26 +71,16 @@ import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.RpcService;
-
import org.apache.flink.util.Preconditions;
-import java.util.HashSet;
-import java.util.Set;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -292,6 +291,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
tdd.getAttemptNumber());
InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(
+ jobManagerConnection.getJobMasterLeaderId(),
jobManagerConnection.getJobManagerGateway(),
jobInformation.getJobId(),
taskInformation.getJobVertexId(),
@@ -605,10 +605,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
clearTasks();
}
- private void updateTaskExecutionState(final JobMasterGateway jobMasterGateway, final TaskExecutionState taskExecutionState) {
+ private void updateTaskExecutionState(
+ final UUID jobMasterLeaderId,
+ final JobMasterGateway jobMasterGateway,
+ final TaskExecutionState taskExecutionState)
+ {
final ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
- Future<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(taskExecutionState);
+ Future<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(
+ jobMasterLeaderId, taskExecutionState);
futureAcknowledge.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
@Override
@@ -620,7 +625,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}, getMainThreadExecutor());
}
- private void unregisterTaskAndNotifyFinalState(final JobMasterGateway jobMasterGateway, ExecutionAttemptID executionAttemptID) {
+ private void unregisterTaskAndNotifyFinalState(
+ final UUID jobMasterLeaderId,
+ final JobMasterGateway jobMasterGateway,
+ final ExecutionAttemptID executionAttemptID)
+ {
Task task = removeTask(executionAttemptID);
if (task != null) {
@@ -638,14 +647,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
AccumulatorSnapshot accumulatorSnapshot = task.getAccumulatorRegistry().getSnapshot();
updateTaskExecutionState(
- jobMasterGateway,
- new TaskExecutionState(
- task.getJobID(),
- task.getExecutionId(),
- task.getExecutionState(),
- task.getFailureCause(),
- accumulatorSnapshot,
- task.getMetricGroup().getIOMetricGroup().createSnapshot()));
+ jobMasterLeaderId,
+ jobMasterGateway,
+ new TaskExecutionState(
+ task.getJobID(),
+ task.getExecutionId(),
+ task.getExecutionState(),
+ task.getFailureCause(),
+ accumulatorSnapshot,
+ task.getMetricGroup().getIOMetricGroup().createSnapshot()));
} else {
log.error("Cannot find task with ID {} to unregister.", executionAttemptID);
}
@@ -687,11 +697,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
- private JobManagerConnection associateWithJobManager(JobMasterGateway jobMasterGateway, int blobPort) {
+ private JobManagerConnection associateWithJobManager(UUID jobMasterLeaderId,
+ JobMasterGateway jobMasterGateway, int blobPort)
+ {
+ Preconditions.checkNotNull(jobMasterLeaderId);
Preconditions.checkNotNull(jobMasterGateway);
Preconditions.checkArgument(blobPort > 0 || blobPort <= 65535, "Blob port is out of range.");
- TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway);
+ TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterLeaderId, jobMasterGateway);
CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway);
@@ -704,19 +717,21 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
taskManagerConfiguration.getCleanupInterval());
ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
- jobMasterGateway,
- getRpcService().getExecutor(),
- taskManagerConfiguration.getTimeout());
+ jobMasterLeaderId,
+ jobMasterGateway,
+ getRpcService().getExecutor(),
+ taskManagerConfiguration.getTimeout());
- PartitionProducerStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterGateway);
+ PartitionProducerStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterLeaderId, jobMasterGateway);
return new JobManagerConnection(
- jobMasterGateway,
- taskManagerActions,
- checkpointResponder,
- libraryCacheManager,
- resultPartitionConsumableNotifier,
- partitionStateChecker);
+ jobMasterLeaderId,
+ jobMasterGateway,
+ taskManagerActions,
+ checkpointResponder,
+ libraryCacheManager,
+ resultPartitionConsumableNotifier,
+ partitionStateChecker);
}
private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException {
@@ -808,9 +823,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
private class TaskManagerActionsImpl implements TaskManagerActions {
+ private final UUID jobMasterLeaderId;
private final JobMasterGateway jobMasterGateway;
- private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) {
+ private TaskManagerActionsImpl(UUID jobMasterLeaderId, JobMasterGateway jobMasterGateway) {
+ this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId);
this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
}
@@ -819,7 +836,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
runAsync(new Runnable() {
@Override
public void run() {
- unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID);
+ unregisterTaskAndNotifyFinalState(jobMasterLeaderId, jobMasterGateway, executionAttemptID);
}
});
}
@@ -842,7 +859,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
@Override
public void updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
- TaskExecutor.this.updateTaskExecutionState(jobMasterGateway, taskExecutionState);
+ TaskExecutor.this.updateTaskExecutionState(jobMasterLeaderId, jobMasterGateway, taskExecutionState);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
index 4850d63..3b9da48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
@@ -31,7 +31,10 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
+import java.util.UUID;
+
public class RpcInputSplitProvider implements InputSplitProvider {
+ private final UUID jobMasterLeaderId;
private final JobMasterGateway jobMasterGateway;
private final JobID jobID;
private final JobVertexID jobVertexID;
@@ -39,11 +42,13 @@ public class RpcInputSplitProvider implements InputSplitProvider {
private final Time timeout;
public RpcInputSplitProvider(
+ UUID jobMasterLeaderId,
JobMasterGateway jobMasterGateway,
JobID jobID,
JobVertexID jobVertexID,
ExecutionAttemptID executionAttemptID,
Time timeout) {
+ this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId);
this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
this.jobID = Preconditions.checkNotNull(jobID);
this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
@@ -56,7 +61,8 @@ public class RpcInputSplitProvider implements InputSplitProvider {
public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
Preconditions.checkNotNull(userCodeClassLoader);
- Future<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(jobVertexID, executionAttemptID);
+ Future<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
+ jobMasterLeaderId, jobVertexID, executionAttemptID);
try {
SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());