You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/07/05 13:29:19 UTC

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4260

    [FLINK-7103] [dispatcher] Add skeletal structure of Dispatcher component

    The Dispatcher is responsible for receiving job submissions, persisting the JobGraphs,
    spawning JobManager to execute the jobs and recovering the jobs in case of a master
    failure. This commit adds the basic skeleton including the RPC call for job submission.
    
    Add cleanup logic for finished jobs
    
    Pass BlobService to JobManagerRunner


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink addDispatcher

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4260.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4260
    
----
commit cba1cb11a3eaaea3511120fa7864028a2c47e6a5
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-04T21:15:52Z

    [FLINK-7103] [dispatcher] Add skeletal structure of Dispatcher component
    
    The Dispatcher is responsible for receiving job submissions, persisting the JobGraphs,
    spawning JobManager to execute the jobs and recovering the jobs in case of a master
    failure. This commit adds the basic skeleton including the RPC call for job submission.
    
    Add cleanup logic for finished jobs
    
    Pass BlobService to JobManagerRunner

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r125880225
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.client.JobSubmissionException;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.jobmaster.JobManagerRunner;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcEndpoint;
    +import org.apache.flink.runtime.rpc.RpcMethod;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Base class for the Dispatcher component. The Dispatcher component is responsible
    + * for receiving job submissions, persisting them, spawning JobManagers to execute
    + * the jobs and to recover them in case of a master failure. Furthermore, it knows
    + * about the state of the Flink session cluster.
    + */
    +public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
    +
    +	protected static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
    +
    +	public static final String DISPATCHER_NAME = "dispatcher";
    +
    +	private final SubmittedJobGraphStore submittedJobGraphStore;
    +	private final RunningJobsRegistry runningJobsRegistry;
    +
    +	private final HighAvailabilityServices highAvailabilityServices;
    +	private final BlobServer blobServer;
    +	private final HeartbeatServices heartbeatServices;
    +	private final MetricRegistry metricRegistry;
    +
    +	private final FatalErrorHandler fatalErrorHandler;
    +
    +	private final Map<JobID, JobManagerRunner> jobManagerRunners;
    +
    +	protected Dispatcher(
    +			RpcService rpcService,
    +			String endpointId,
    +			HighAvailabilityServices highAvailabilityServices,
    +			BlobServer blobServer,
    +			HeartbeatServices heartbeatServices,
    +			MetricRegistry metricRegistry,
    +			FatalErrorHandler fatalErrorHandler) throws Exception {
    +		super(rpcService, endpointId);
    +
    +		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
    +		this.blobServer = Preconditions.checkNotNull(blobServer);
    +		this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
    +		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
    +		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
    +
    +		this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore();
    +		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
    +
    +		jobManagerRunners = new HashMap<>(16);
    +	}
    +
    +	//------------------------------------------------------
    +	// Lifecycle methods
    +	//------------------------------------------------------
    +
    +	@Override
    +	public void shutDown() throws Exception {
    +		Exception exception = null;
    +		// stop all currently running JobManagerRunners
    +		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		jobManagerRunners.clear();
    +
    +		try {
    +			submittedJobGraphStore.stop();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		try {
    +			super.shutDown();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		if (exception != null) {
    +			throw new FlinkException("Could not properly terminate the Dispatcher.", exception);
    +		}
    +	}
    +
    +	//------------------------------------------------------
    +	// RPCs
    +	//------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException {
    +		final JobID jobId = jobGraph.getJobID();
    +
    +		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
    +
    +		final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
    +
    +		try {
    +			jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
    +		} catch (IOException e) {
    +			log.warn("Cannot retrieve job status for {}.", jobId, e);
    +			throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e);
    +		}
    +
    +		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
    +			try {
    +				submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
    +			} catch (Exception e) {
    +				log.warn("Cannot persist JobGraph.", e);
    +				throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e);
    +			}
    +
    +			final JobManagerRunner jobManagerRunner;
    +
    +			try {
    +				jobManagerRunner = createJobManagerRunner(
    +					ResourceID.generate(),
    +					jobGraph,
    +					null,
    +					getRpcService(),
    +					highAvailabilityServices,
    +					blobServer,
    +					heartbeatServices,
    +					metricRegistry,
    +					new DispatcherOnCompleteActions(jobGraph.getJobID()),
    +					fatalErrorHandler);
    +
    +				jobManagerRunner.start();
    +			} catch (Exception e) {
    +				try {
    +					// We should only remove a job from the submitted job graph store
    +					// if the initial submission failed. Never in case of a recovery
    +					submittedJobGraphStore.removeJobGraph(jobId);
    +				} catch (Throwable t) {
    +					log.warn("Cannot remove job graph from submitted job graph store.", t);
    +					e.addSuppressed(t);
    +				}
    +
    +				throw new JobSubmissionException(jobId, "Could not start JobManager.", e);
    +			}
    +
    +			jobManagerRunners.put(jobId, jobManagerRunner);
    +
    +			return Acknowledge.get();
    +		} else {
    +			throw new JobSubmissionException(jobId, "Job has already been submitted and " +
    +				"is currently in state " + jobSchedulingStatus + '.');
    +		}
    +	}
    +
    +	@RpcMethod
    +	Collection<JobID> listJobs() {
    +		// TODO: return proper list of running jobs
    +		return Collections.emptyList();
    --- End diff --
    
    I guess you're right. Will add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r125880420
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.client.JobSubmissionException;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.jobmaster.JobManagerRunner;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcEndpoint;
    +import org.apache.flink.runtime.rpc.RpcMethod;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Base class for the Dispatcher component. The Dispatcher component is responsible
    + * for receiving job submissions, persisting them, spawning JobManagers to execute
    + * the jobs and to recover them in case of a master failure. Furthermore, it knows
    + * about the state of the Flink session cluster.
    + */
    +public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
    +
    +	protected static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
    +
    +	public static final String DISPATCHER_NAME = "dispatcher";
    +
    +	private final SubmittedJobGraphStore submittedJobGraphStore;
    +	private final RunningJobsRegistry runningJobsRegistry;
    +
    +	private final HighAvailabilityServices highAvailabilityServices;
    +	private final BlobServer blobServer;
    +	private final HeartbeatServices heartbeatServices;
    +	private final MetricRegistry metricRegistry;
    +
    +	private final FatalErrorHandler fatalErrorHandler;
    +
    +	private final Map<JobID, JobManagerRunner> jobManagerRunners;
    +
    +	protected Dispatcher(
    +			RpcService rpcService,
    +			String endpointId,
    +			HighAvailabilityServices highAvailabilityServices,
    +			BlobServer blobServer,
    +			HeartbeatServices heartbeatServices,
    +			MetricRegistry metricRegistry,
    +			FatalErrorHandler fatalErrorHandler) throws Exception {
    +		super(rpcService, endpointId);
    +
    +		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
    +		this.blobServer = Preconditions.checkNotNull(blobServer);
    +		this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
    +		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
    +		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
    +
    +		this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore();
    +		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
    +
    +		jobManagerRunners = new HashMap<>(16);
    +	}
    +
    +	//------------------------------------------------------
    +	// Lifecycle methods
    +	//------------------------------------------------------
    +
    +	@Override
    +	public void shutDown() throws Exception {
    +		Exception exception = null;
    +		// stop all currently running JobManagerRunners
    +		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		jobManagerRunners.clear();
    +
    +		try {
    +			submittedJobGraphStore.stop();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		try {
    +			super.shutDown();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		if (exception != null) {
    +			throw new FlinkException("Could not properly terminate the Dispatcher.", exception);
    +		}
    +	}
    +
    +	//------------------------------------------------------
    +	// RPCs
    +	//------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException {
    +		final JobID jobId = jobGraph.getJobID();
    +
    +		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
    +
    +		final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
    +
    +		try {
    +			jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
    +		} catch (IOException e) {
    +			log.warn("Cannot retrieve job status for {}.", jobId, e);
    +			throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e);
    +		}
    +
    +		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
    +			try {
    +				submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
    +			} catch (Exception e) {
    +				log.warn("Cannot persist JobGraph.", e);
    +				throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e);
    +			}
    +
    +			final JobManagerRunner jobManagerRunner;
    +
    +			try {
    +				jobManagerRunner = createJobManagerRunner(
    +					ResourceID.generate(),
    +					jobGraph,
    +					null,
    +					getRpcService(),
    +					highAvailabilityServices,
    +					blobServer,
    +					heartbeatServices,
    +					metricRegistry,
    +					new DispatcherOnCompleteActions(jobGraph.getJobID()),
    +					fatalErrorHandler);
    +
    +				jobManagerRunner.start();
    +			} catch (Exception e) {
    +				try {
    +					// We should only remove a job from the submitted job graph store
    +					// if the initial submission failed. Never in case of a recovery
    +					submittedJobGraphStore.removeJobGraph(jobId);
    +				} catch (Throwable t) {
    +					log.warn("Cannot remove job graph from submitted job graph store.", t);
    +					e.addSuppressed(t);
    +				}
    +
    +				throw new JobSubmissionException(jobId, "Could not start JobManager.", e);
    +			}
    +
    +			jobManagerRunners.put(jobId, jobManagerRunner);
    +
    +			return Acknowledge.get();
    +		} else {
    +			throw new JobSubmissionException(jobId, "Job has already been submitted and " +
    +				"is currently in state " + jobSchedulingStatus + '.');
    +		}
    +	}
    +
    +	@RpcMethod
    +	Collection<JobID> listJobs() {
    +		// TODO: return proper list of running jobs
    +		return Collections.emptyList();
    +	}
    +
    +	/**
    +	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
    +	 * the data will also be removed from HA.
    +	 *
    +	 * @param jobId JobID identifying the job to clean up
    +	 * @param cleanupHA True iff HA data shall also be cleaned up
    +	 */
    +	private void removeJob(JobID jobId, boolean cleanupHA) throws Exception {
    +		JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId);
    +
    +		if (jobManagerRunner != null) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		if (cleanupHA) {
    +			submittedJobGraphStore.removeJobGraph(jobId);
    +		}
    +
    +		// TODO: remove job related files from blob server
    +	}
    +
    +	protected abstract JobManagerRunner createJobManagerRunner(
    +		ResourceID resourceId,
    +		JobGraph jobGraph,
    +		Configuration configuration,
    +		RpcService rpcService,
    +		HighAvailabilityServices highAvailabilityServices,
    +		BlobService blobService,
    +		HeartbeatServices heartbeatServices,
    +		MetricRegistry metricRegistry,
    +		OnCompletionActions onCompleteActions,
    +		FatalErrorHandler fatalErrorHandler) throws Exception;
    +
    +	//------------------------------------------------------
    +	// Utility classes
    +	//------------------------------------------------------
    +
    +	private class DispatcherOnCompleteActions implements OnCompletionActions {
    +
    +		private final JobID jobId;
    +
    +		private DispatcherOnCompleteActions(JobID jobId) {
    +			this.jobId = Preconditions.checkNotNull(jobId);
    +		}
    +
    +		@Override
    +		public void jobFinished(JobExecutionResult result) {
    +			LOG.info("Job {} finished.", jobId);
    +
    +			runAsync(new Runnable() {
    +				@Override
    +				public void run() {
    +					try {
    +						removeJob(jobId, true);
    +					} catch (Exception e) {
    +						LOG.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
    +					}
    +				}
    +			});
    +		}
    +
    +		@Override
    +		public void jobFailed(Throwable cause) {
    +			LOG.info("Job {} failed.", jobId);
    +
    +			runAsync(new Runnable() {
    +				@Override
    +				public void run() {
    +					try {
    +						removeJob(jobId, true);
    +					} catch (Exception e) {
    +						LOG.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
    +					}
    +				}
    +			});
    +		}
    +
    +		@Override
    +		public void jobFinishedByOther() {
    +			LOG.info("Job {} was finished by other.", jobId);
    --- End diff --
    
    If there are multiple JobMasters (e.g. when having standby masters) and another JobMaster finished the job. For this implementation of the Dispatcher this won't be required, since we only start one JobMaster per job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r125880604
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java ---
    @@ -44,9 +46,9 @@
     	 * @param jobGraph The submitted {@link JobGraph}
     	 * @param jobInfo  The {@link JobInfo}
     	 */
    -	public SubmittedJobGraph(JobGraph jobGraph, JobInfo jobInfo) {
    +	public SubmittedJobGraph(JobGraph jobGraph, @Nullable JobInfo jobInfo) {
     		this.jobGraph = checkNotNull(jobGraph, "Job graph");
    -		this.jobInfo = checkNotNull(jobInfo, "Job info");
    --- End diff --
    
    Yes I think so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r125862145
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.client.JobSubmissionException;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.jobmaster.JobManagerRunner;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcEndpoint;
    +import org.apache.flink.runtime.rpc.RpcMethod;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Base class for the Dispatcher component. The Dispatcher component is responsible
    + * for receiving job submissions, persisting them, spawning JobManagers to execute
    + * the jobs and to recover them in case of a master failure. Furthermore, it knows
    + * about the state of the Flink session cluster.
    + */
    +public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
    +
    +	protected static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
    --- End diff --
    
    we already inherit a logger from RpcEndpoint.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4260


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4260: [FLINK-7103] [dispatcher] Add skeletal structure of Dispa...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/4260
  
    My  take on this skeleton is that the `Dispatcher` should not use the `JobManagerRunner` directly.   My rationale is: 
    1. The dispatcher should never execute user code, otherwise it cannot truly support multi-user scenarios.
    2. The runner is a concrete class that acts as a bootstrap for the JM, to be used in the same process as the JM.   But the dispatcher must support remote JM execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r125863198
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.client.JobSubmissionException;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.jobmaster.JobManagerRunner;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcEndpoint;
    +import org.apache.flink.runtime.rpc.RpcMethod;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Base class for the Dispatcher component. The Dispatcher component is responsible
    + * for receiving job submissions, persisting them, spawning JobManagers to execute
    + * the jobs and to recover them in case of a master failure. Furthermore, it knows
    + * about the state of the Flink session cluster.
    + */
    +public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
    +
    +	protected static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
    +
    +	public static final String DISPATCHER_NAME = "dispatcher";
    +
    +	private final SubmittedJobGraphStore submittedJobGraphStore;
    +	private final RunningJobsRegistry runningJobsRegistry;
    +
    +	private final HighAvailabilityServices highAvailabilityServices;
    +	private final BlobServer blobServer;
    +	private final HeartbeatServices heartbeatServices;
    +	private final MetricRegistry metricRegistry;
    +
    +	private final FatalErrorHandler fatalErrorHandler;
    +
    +	private final Map<JobID, JobManagerRunner> jobManagerRunners;
    +
    +	protected Dispatcher(
    +			RpcService rpcService,
    +			String endpointId,
    +			HighAvailabilityServices highAvailabilityServices,
    +			BlobServer blobServer,
    +			HeartbeatServices heartbeatServices,
    +			MetricRegistry metricRegistry,
    +			FatalErrorHandler fatalErrorHandler) throws Exception {
    +		super(rpcService, endpointId);
    +
    +		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
    +		this.blobServer = Preconditions.checkNotNull(blobServer);
    +		this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
    +		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
    +		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
    +
    +		this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore();
    +		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
    +
    +		jobManagerRunners = new HashMap<>(16);
    +	}
    +
    +	//------------------------------------------------------
    +	// Lifecycle methods
    +	//------------------------------------------------------
    +
    +	@Override
    +	public void shutDown() throws Exception {
    +		Exception exception = null;
    +		// stop all currently running JobManagerRunners
    +		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		jobManagerRunners.clear();
    +
    +		try {
    +			submittedJobGraphStore.stop();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		try {
    +			super.shutDown();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		if (exception != null) {
    +			throw new FlinkException("Could not properly terminate the Dispatcher.", exception);
    +		}
    +	}
    +
    +	//------------------------------------------------------
    +	// RPCs
    +	//------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException {
    +		final JobID jobId = jobGraph.getJobID();
    +
    +		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
    +
    +		final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
    +
    +		try {
    +			jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
    +		} catch (IOException e) {
    +			log.warn("Cannot retrieve job status for {}.", jobId, e);
    +			throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e);
    +		}
    +
    +		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
    +			try {
    +				submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
    +			} catch (Exception e) {
    +				log.warn("Cannot persist JobGraph.", e);
    +				throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e);
    +			}
    +
    +			final JobManagerRunner jobManagerRunner;
    +
    +			try {
    +				jobManagerRunner = createJobManagerRunner(
    +					ResourceID.generate(),
    +					jobGraph,
    +					null,
    +					getRpcService(),
    +					highAvailabilityServices,
    +					blobServer,
    +					heartbeatServices,
    +					metricRegistry,
    +					new DispatcherOnCompleteActions(jobGraph.getJobID()),
    +					fatalErrorHandler);
    +
    +				jobManagerRunner.start();
    +			} catch (Exception e) {
    +				try {
    +					// We should only remove a job from the submitted job graph store
    +					// if the initial submission failed. Never in case of a recovery
    +					submittedJobGraphStore.removeJobGraph(jobId);
    +				} catch (Throwable t) {
    +					log.warn("Cannot remove job graph from submitted job graph store.", t);
    +					e.addSuppressed(t);
    +				}
    +
    +				throw new JobSubmissionException(jobId, "Could not start JobManager.", e);
    +			}
    +
    +			jobManagerRunners.put(jobId, jobManagerRunner);
    +
    +			return Acknowledge.get();
    +		} else {
    +			throw new JobSubmissionException(jobId, "Job has already been submitted and " +
    +				"is currently in state " + jobSchedulingStatus + '.');
    +		}
    +	}
    +
    +	@RpcMethod
    +	Collection<JobID> listJobs() {
    +		// TODO: return proper list of running jobs
    +		return Collections.emptyList();
    +	}
    +
    +	/**
    +	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
    +	 * the data will also be removed from HA.
    +	 *
    +	 * @param jobId JobID identifying the job to clean up
    +	 * @param cleanupHA True iff HA data shall also be cleaned up
    +	 */
    +	private void removeJob(JobID jobId, boolean cleanupHA) throws Exception {
    +		JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId);
    +
    +		if (jobManagerRunner != null) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		if (cleanupHA) {
    +			submittedJobGraphStore.removeJobGraph(jobId);
    +		}
    +
    +		// TODO: remove job related files from blob server
    +	}
    +
    +	protected abstract JobManagerRunner createJobManagerRunner(
    +		ResourceID resourceId,
    +		JobGraph jobGraph,
    +		Configuration configuration,
    +		RpcService rpcService,
    +		HighAvailabilityServices highAvailabilityServices,
    +		BlobService blobService,
    +		HeartbeatServices heartbeatServices,
    +		MetricRegistry metricRegistry,
    +		OnCompletionActions onCompleteActions,
    +		FatalErrorHandler fatalErrorHandler) throws Exception;
    +
    +	//------------------------------------------------------
    +	// Utility classes
    +	//------------------------------------------------------
    +
    +	private class DispatcherOnCompleteActions implements OnCompletionActions {
    +
    +		private final JobID jobId;
    +
    +		private DispatcherOnCompleteActions(JobID jobId) {
    +			this.jobId = Preconditions.checkNotNull(jobId);
    +		}
    +
    +		@Override
    +		public void jobFinished(JobExecutionResult result) {
    +			LOG.info("Job {} finished.", jobId);
    +
    +			runAsync(new Runnable() {
    +				@Override
    +				public void run() {
    +					try {
    +						removeJob(jobId, true);
    +					} catch (Exception e) {
    +						LOG.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
    +					}
    +				}
    +			});
    +		}
    +
    +		@Override
    +		public void jobFailed(Throwable cause) {
    +			LOG.info("Job {} failed.", jobId);
    +
    +			runAsync(new Runnable() {
    +				@Override
    +				public void run() {
    +					try {
    +						removeJob(jobId, true);
    +					} catch (Exception e) {
    +						LOG.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
    +					}
    +				}
    +			});
    +		}
    +
    +		@Override
    +		public void jobFinishedByOther() {
    +			LOG.info("Job {} was finished by other.", jobId);
    --- End diff --
    
    Which cases does this cover, i.e who/what finished the job? Unfortunately the OnCompleteActions interface has no documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4260: [FLINK-7103] [dispatcher] Add skeletal structure of Dispa...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4260
  
    merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r125880464
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.client.JobSubmissionException;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.jobmaster.JobManagerRunner;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcEndpoint;
    +import org.apache.flink.runtime.rpc.RpcMethod;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Base class for the Dispatcher component. The Dispatcher component is responsible
    + * for receiving job submissions, persisting them, spawning JobManagers to execute
    + * the jobs and to recover them in case of a master failure. Furthermore, it knows
    + * about the state of the Flink session cluster.
    + */
    +public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
    +
    +	protected static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
    +
    +	public static final String DISPATCHER_NAME = "dispatcher";
    +
    +	private final SubmittedJobGraphStore submittedJobGraphStore;
    +	private final RunningJobsRegistry runningJobsRegistry;
    +
    +	private final HighAvailabilityServices highAvailabilityServices;
    +	private final BlobServer blobServer;
    +	private final HeartbeatServices heartbeatServices;
    +	private final MetricRegistry metricRegistry;
    +
    +	private final FatalErrorHandler fatalErrorHandler;
    +
    +	private final Map<JobID, JobManagerRunner> jobManagerRunners;
    +
    +	protected Dispatcher(
    +			RpcService rpcService,
    +			String endpointId,
    +			HighAvailabilityServices highAvailabilityServices,
    +			BlobServer blobServer,
    +			HeartbeatServices heartbeatServices,
    +			MetricRegistry metricRegistry,
    +			FatalErrorHandler fatalErrorHandler) throws Exception {
    +		super(rpcService, endpointId);
    +
    +		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
    +		this.blobServer = Preconditions.checkNotNull(blobServer);
    +		this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
    +		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
    +		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
    +
    +		this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore();
    +		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
    +
    +		jobManagerRunners = new HashMap<>(16);
    +	}
    +
    +	//------------------------------------------------------
    +	// Lifecycle methods
    +	//------------------------------------------------------
    +
    +	@Override
    +	public void shutDown() throws Exception {
    +		Exception exception = null;
    +		// stop all currently running JobManagerRunners
    +		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		jobManagerRunners.clear();
    +
    +		try {
    +			submittedJobGraphStore.stop();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		try {
    +			super.shutDown();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		if (exception != null) {
    +			throw new FlinkException("Could not properly terminate the Dispatcher.", exception);
    +		}
    +	}
    +
    +	//------------------------------------------------------
    +	// RPCs
    +	//------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException {
    +		final JobID jobId = jobGraph.getJobID();
    +
    +		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
    +
    +		final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
    +
    +		try {
    +			jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
    +		} catch (IOException e) {
    +			log.warn("Cannot retrieve job status for {}.", jobId, e);
    +			throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e);
    +		}
    +
    +		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
    +			try {
    +				submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
    +			} catch (Exception e) {
    +				log.warn("Cannot persist JobGraph.", e);
    +				throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e);
    +			}
    +
    +			final JobManagerRunner jobManagerRunner;
    +
    +			try {
    +				jobManagerRunner = createJobManagerRunner(
    +					ResourceID.generate(),
    +					jobGraph,
    +					null,
    +					getRpcService(),
    +					highAvailabilityServices,
    +					blobServer,
    +					heartbeatServices,
    +					metricRegistry,
    +					new DispatcherOnCompleteActions(jobGraph.getJobID()),
    +					fatalErrorHandler);
    +
    +				jobManagerRunner.start();
    +			} catch (Exception e) {
    +				try {
    +					// We should only remove a job from the submitted job graph store
    +					// if the initial submission failed. Never in case of a recovery
    +					submittedJobGraphStore.removeJobGraph(jobId);
    +				} catch (Throwable t) {
    +					log.warn("Cannot remove job graph from submitted job graph store.", t);
    +					e.addSuppressed(t);
    +				}
    +
    +				throw new JobSubmissionException(jobId, "Could not start JobManager.", e);
    +			}
    +
    +			jobManagerRunners.put(jobId, jobManagerRunner);
    +
    +			return Acknowledge.get();
    +		} else {
    +			throw new JobSubmissionException(jobId, "Job has already been submitted and " +
    +				"is currently in state " + jobSchedulingStatus + '.');
    +		}
    +	}
    +
    +	@RpcMethod
    +	Collection<JobID> listJobs() {
    +		// TODO: return proper list of running jobs
    +		return Collections.emptyList();
    +	}
    +
    +	/**
    +	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
    +	 * the data will also be removed from HA.
    +	 *
    +	 * @param jobId JobID identifying the job to clean up
    +	 * @param cleanupHA True iff HA data shall also be cleaned up
    +	 */
    +	private void removeJob(JobID jobId, boolean cleanupHA) throws Exception {
    +		JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId);
    +
    +		if (jobManagerRunner != null) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		if (cleanupHA) {
    +			submittedJobGraphStore.removeJobGraph(jobId);
    +		}
    +
    +		// TODO: remove job related files from blob server
    +	}
    +
    +	protected abstract JobManagerRunner createJobManagerRunner(
    +		ResourceID resourceId,
    +		JobGraph jobGraph,
    +		Configuration configuration,
    +		RpcService rpcService,
    +		HighAvailabilityServices highAvailabilityServices,
    +		BlobService blobService,
    +		HeartbeatServices heartbeatServices,
    +		MetricRegistry metricRegistry,
    +		OnCompletionActions onCompleteActions,
    +		FatalErrorHandler fatalErrorHandler) throws Exception;
    +
    +	//------------------------------------------------------
    +	// Utility classes
    +	//------------------------------------------------------
    +
    +	private class DispatcherOnCompleteActions implements OnCompletionActions {
    +
    +		private final JobID jobId;
    +
    +		private DispatcherOnCompleteActions(JobID jobId) {
    +			this.jobId = Preconditions.checkNotNull(jobId);
    +		}
    +
    +		@Override
    +		public void jobFinished(JobExecutionResult result) {
    +			LOG.info("Job {} finished.", jobId);
    +
    +			runAsync(new Runnable() {
    --- End diff --
    
    I'll revisit this section.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r129608810
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.client.JobSubmissionException;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.jobmaster.JobManagerRunner;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcEndpoint;
    +import org.apache.flink.runtime.rpc.RpcMethod;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Base class for the Dispatcher component. The Dispatcher component is responsible
    + * for receiving job submissions, persisting them, spawning JobManagers to execute
    + * the jobs and to recover them in case of a master failure. Furthermore, it knows
    + * about the state of the Flink session cluster.
    + */
    +public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
    +
    +	public static final String DISPATCHER_NAME = "dispatcher";
    +
    +	private final SubmittedJobGraphStore submittedJobGraphStore;
    +	private final RunningJobsRegistry runningJobsRegistry;
    +
    +	private final HighAvailabilityServices highAvailabilityServices;
    +	private final BlobServer blobServer;
    +	private final HeartbeatServices heartbeatServices;
    +	private final MetricRegistry metricRegistry;
    +
    +	private final FatalErrorHandler fatalErrorHandler;
    +
    +	private final Map<JobID, JobManagerRunner> jobManagerRunners;
    +
    +	protected Dispatcher(
    +			RpcService rpcService,
    +			String endpointId,
    +			HighAvailabilityServices highAvailabilityServices,
    +			BlobServer blobServer,
    +			HeartbeatServices heartbeatServices,
    +			MetricRegistry metricRegistry,
    +			FatalErrorHandler fatalErrorHandler) throws Exception {
    +		super(rpcService, endpointId);
    +
    +		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
    +		this.blobServer = Preconditions.checkNotNull(blobServer);
    +		this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
    +		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
    +		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
    +
    +		this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore();
    +		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
    +
    +		jobManagerRunners = new HashMap<>(16);
    +	}
    +
    +	//------------------------------------------------------
    +	// Lifecycle methods
    +	//------------------------------------------------------
    +
    +	@Override
    +	public void shutDown() throws Exception {
    +		Exception exception = null;
    +		// stop all currently running JobManagerRunners
    +		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		jobManagerRunners.clear();
    +
    +		try {
    +			submittedJobGraphStore.stop();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		try {
    +			super.shutDown();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		if (exception != null) {
    +			throw new FlinkException("Could not properly terminate the Dispatcher.", exception);
    +		}
    +	}
    +
    +	//------------------------------------------------------
    +	// RPCs
    +	//------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException {
    +		final JobID jobId = jobGraph.getJobID();
    +
    +		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
    +
    +		final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
    +
    +		try {
    +			jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
    +		} catch (IOException e) {
    +			log.warn("Cannot retrieve job status for {}.", jobId, e);
    +			throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e);
    +		}
    +
    +		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
    +			try {
    +				submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
    +			} catch (Exception e) {
    +				log.warn("Cannot persist JobGraph.", e);
    +				throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e);
    +			}
    +
    +			final JobManagerRunner jobManagerRunner;
    +
    +			try {
    +				jobManagerRunner = createJobManagerRunner(
    +					ResourceID.generate(),
    +					jobGraph,
    +					null,
    +					getRpcService(),
    +					highAvailabilityServices,
    +					blobServer,
    +					heartbeatServices,
    +					metricRegistry,
    +					new DispatcherOnCompleteActions(jobGraph.getJobID()),
    +					fatalErrorHandler);
    +
    +				jobManagerRunner.start();
    +			} catch (Exception e) {
    +				try {
    +					// We should only remove a job from the submitted job graph store
    +					// if the initial submission failed. Never in case of a recovery
    +					submittedJobGraphStore.removeJobGraph(jobId);
    +				} catch (Throwable t) {
    +					log.warn("Cannot remove job graph from submitted job graph store.", t);
    +					e.addSuppressed(t);
    +				}
    +
    +				throw new JobSubmissionException(jobId, "Could not start JobManager.", e);
    +			}
    +
    +			jobManagerRunners.put(jobId, jobManagerRunner);
    +
    +			return Acknowledge.get();
    +		} else {
    +			throw new JobSubmissionException(jobId, "Job has already been submitted and " +
    +				"is currently in state " + jobSchedulingStatus + '.');
    +		}
    +	}
    +
    +	@RpcMethod
    +	public Collection<JobID> listJobs() {
    +		// TODO: return proper list of running jobs
    +		return jobManagerRunners.keySet();
    +	}
    +
    +	/**
    +	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
    +	 * the data will also be removed from HA.
    +	 *
    +	 * @param jobId JobID identifying the job to clean up
    +	 * @param cleanupHA True iff HA data shall also be cleaned up
    +	 */
    +	private void removeJob(JobID jobId, boolean cleanupHA) throws Exception {
    +		JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId);
    +
    +		if (jobManagerRunner != null) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		if (cleanupHA) {
    +			submittedJobGraphStore.removeJobGraph(jobId);
    +		}
    +
    +		// TODO: remove job related files from blob server
    +	}
    +
    +	protected abstract JobManagerRunner createJobManagerRunner(
    +		ResourceID resourceId,
    +		JobGraph jobGraph,
    +		Configuration configuration,
    +		RpcService rpcService,
    +		HighAvailabilityServices highAvailabilityServices,
    +		BlobService blobService,
    +		HeartbeatServices heartbeatServices,
    +		MetricRegistry metricRegistry,
    +		OnCompletionActions onCompleteActions,
    +		FatalErrorHandler fatalErrorHandler) throws Exception;
    +
    +	//------------------------------------------------------
    +	// Utility classes
    +	//------------------------------------------------------
    +
    +	private class DispatcherOnCompleteActions implements OnCompletionActions {
    --- End diff --
    
    True. Will change it as part of the next upcoming PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r127075409
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.client.JobSubmissionException;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.jobmaster.JobManagerRunner;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcEndpoint;
    +import org.apache.flink.runtime.rpc.RpcMethod;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Base class for the Dispatcher component. The Dispatcher component is responsible
    + * for receiving job submissions, persisting them, spawning JobManagers to execute
    + * the jobs and to recover them in case of a master failure. Furthermore, it knows
    + * about the state of the Flink session cluster.
    + */
    +public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
    +
    +	public static final String DISPATCHER_NAME = "dispatcher";
    +
    +	private final SubmittedJobGraphStore submittedJobGraphStore;
    +	private final RunningJobsRegistry runningJobsRegistry;
    +
    +	private final HighAvailabilityServices highAvailabilityServices;
    +	private final BlobServer blobServer;
    +	private final HeartbeatServices heartbeatServices;
    +	private final MetricRegistry metricRegistry;
    +
    +	private final FatalErrorHandler fatalErrorHandler;
    +
    +	private final Map<JobID, JobManagerRunner> jobManagerRunners;
    +
    +	protected Dispatcher(
    +			RpcService rpcService,
    +			String endpointId,
    +			HighAvailabilityServices highAvailabilityServices,
    +			BlobServer blobServer,
    +			HeartbeatServices heartbeatServices,
    +			MetricRegistry metricRegistry,
    +			FatalErrorHandler fatalErrorHandler) throws Exception {
    +		super(rpcService, endpointId);
    +
    +		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
    +		this.blobServer = Preconditions.checkNotNull(blobServer);
    +		this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
    +		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
    +		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
    +
    +		this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore();
    +		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
    +
    +		jobManagerRunners = new HashMap<>(16);
    +	}
    +
    +	//------------------------------------------------------
    +	// Lifecycle methods
    +	//------------------------------------------------------
    +
    +	@Override
    +	public void shutDown() throws Exception {
    +		Exception exception = null;
    +		// stop all currently running JobManagerRunners
    +		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		jobManagerRunners.clear();
    +
    +		try {
    +			submittedJobGraphStore.stop();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		try {
    +			super.shutDown();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		if (exception != null) {
    +			throw new FlinkException("Could not properly terminate the Dispatcher.", exception);
    +		}
    +	}
    +
    +	//------------------------------------------------------
    +	// RPCs
    +	//------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException {
    +		final JobID jobId = jobGraph.getJobID();
    +
    +		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
    +
    +		final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
    +
    +		try {
    +			jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
    +		} catch (IOException e) {
    +			log.warn("Cannot retrieve job status for {}.", jobId, e);
    +			throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e);
    +		}
    +
    +		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
    +			try {
    +				submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
    +			} catch (Exception e) {
    +				log.warn("Cannot persist JobGraph.", e);
    +				throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e);
    +			}
    +
    +			final JobManagerRunner jobManagerRunner;
    +
    +			try {
    +				jobManagerRunner = createJobManagerRunner(
    +					ResourceID.generate(),
    +					jobGraph,
    +					null,
    +					getRpcService(),
    +					highAvailabilityServices,
    +					blobServer,
    +					heartbeatServices,
    +					metricRegistry,
    +					new DispatcherOnCompleteActions(jobGraph.getJobID()),
    +					fatalErrorHandler);
    +
    +				jobManagerRunner.start();
    +			} catch (Exception e) {
    +				try {
    +					// We should only remove a job from the submitted job graph store
    +					// if the initial submission failed. Never in case of a recovery
    +					submittedJobGraphStore.removeJobGraph(jobId);
    +				} catch (Throwable t) {
    +					log.warn("Cannot remove job graph from submitted job graph store.", t);
    +					e.addSuppressed(t);
    +				}
    +
    +				throw new JobSubmissionException(jobId, "Could not start JobManager.", e);
    +			}
    +
    +			jobManagerRunners.put(jobId, jobManagerRunner);
    +
    +			return Acknowledge.get();
    +		} else {
    +			throw new JobSubmissionException(jobId, "Job has already been submitted and " +
    +				"is currently in state " + jobSchedulingStatus + '.');
    +		}
    +	}
    +
    +	@RpcMethod
    +	public Collection<JobID> listJobs() {
    +		// TODO: return proper list of running jobs
    +		return jobManagerRunners.keySet();
    +	}
    +
    +	/**
    +	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
    +	 * the data will also be removed from HA.
    +	 *
    +	 * @param jobId JobID identifying the job to clean up
    +	 * @param cleanupHA True iff HA data shall also be cleaned up
    +	 */
    +	private void removeJob(JobID jobId, boolean cleanupHA) throws Exception {
    +		JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId);
    +
    +		if (jobManagerRunner != null) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		if (cleanupHA) {
    +			submittedJobGraphStore.removeJobGraph(jobId);
    +		}
    +
    +		// TODO: remove job related files from blob server
    +	}
    +
    +	protected abstract JobManagerRunner createJobManagerRunner(
    +		ResourceID resourceId,
    +		JobGraph jobGraph,
    +		Configuration configuration,
    +		RpcService rpcService,
    +		HighAvailabilityServices highAvailabilityServices,
    +		BlobService blobService,
    +		HeartbeatServices heartbeatServices,
    +		MetricRegistry metricRegistry,
    +		OnCompletionActions onCompleteActions,
    +		FatalErrorHandler fatalErrorHandler) throws Exception;
    +
    +	//------------------------------------------------------
    +	// Utility classes
    +	//------------------------------------------------------
    +
    +	private class DispatcherOnCompleteActions implements OnCompletionActions {
    --- End diff --
    
    nit: DispatcherOnCompletionActions would be better name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r125880119
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.client.JobSubmissionException;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.jobmaster.JobManagerRunner;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcEndpoint;
    +import org.apache.flink.runtime.rpc.RpcMethod;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Base class for the Dispatcher component. The Dispatcher component is responsible
    + * for receiving job submissions, persisting them, spawning JobManagers to execute
    + * the jobs and to recover them in case of a master failure. Furthermore, it knows
    + * about the state of the Flink session cluster.
    + */
    +public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
    +
    +	protected static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
    --- End diff --
    
    True good point. Will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r125880547
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmaster.JobManagerRunner;
    +import org.apache.flink.runtime.jobmaster.JobMaster;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcService;
    +
    +/**
    + * Dispatcher implementation which spawns for each submitted {@link JobGraph}
    --- End diff --
    
    will change it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r125862847
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.client.JobSubmissionException;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.jobmaster.JobManagerRunner;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcEndpoint;
    +import org.apache.flink.runtime.rpc.RpcMethod;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Base class for the Dispatcher component. The Dispatcher component is responsible
    + * for receiving job submissions, persisting them, spawning JobManagers to execute
    + * the jobs and to recover them in case of a master failure. Furthermore, it knows
    + * about the state of the Flink session cluster.
    + */
    +public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
    +
    +	protected static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
    +
    +	public static final String DISPATCHER_NAME = "dispatcher";
    +
    +	private final SubmittedJobGraphStore submittedJobGraphStore;
    +	private final RunningJobsRegistry runningJobsRegistry;
    +
    +	private final HighAvailabilityServices highAvailabilityServices;
    +	private final BlobServer blobServer;
    +	private final HeartbeatServices heartbeatServices;
    +	private final MetricRegistry metricRegistry;
    +
    +	private final FatalErrorHandler fatalErrorHandler;
    +
    +	private final Map<JobID, JobManagerRunner> jobManagerRunners;
    +
    +	protected Dispatcher(
    +			RpcService rpcService,
    +			String endpointId,
    +			HighAvailabilityServices highAvailabilityServices,
    +			BlobServer blobServer,
    +			HeartbeatServices heartbeatServices,
    +			MetricRegistry metricRegistry,
    +			FatalErrorHandler fatalErrorHandler) throws Exception {
    +		super(rpcService, endpointId);
    +
    +		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
    +		this.blobServer = Preconditions.checkNotNull(blobServer);
    +		this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
    +		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
    +		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
    +
    +		this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore();
    +		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
    +
    +		jobManagerRunners = new HashMap<>(16);
    +	}
    +
    +	//------------------------------------------------------
    +	// Lifecycle methods
    +	//------------------------------------------------------
    +
    +	@Override
    +	public void shutDown() throws Exception {
    +		Exception exception = null;
    +		// stop all currently running JobManagerRunners
    +		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		jobManagerRunners.clear();
    +
    +		try {
    +			submittedJobGraphStore.stop();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		try {
    +			super.shutDown();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		if (exception != null) {
    +			throw new FlinkException("Could not properly terminate the Dispatcher.", exception);
    +		}
    +	}
    +
    +	//------------------------------------------------------
    +	// RPCs
    +	//------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException {
    +		final JobID jobId = jobGraph.getJobID();
    +
    +		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
    +
    +		final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
    +
    +		try {
    +			jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
    +		} catch (IOException e) {
    +			log.warn("Cannot retrieve job status for {}.", jobId, e);
    +			throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e);
    +		}
    +
    +		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
    +			try {
    +				submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
    +			} catch (Exception e) {
    +				log.warn("Cannot persist JobGraph.", e);
    +				throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e);
    +			}
    +
    +			final JobManagerRunner jobManagerRunner;
    +
    +			try {
    +				jobManagerRunner = createJobManagerRunner(
    +					ResourceID.generate(),
    +					jobGraph,
    +					null,
    +					getRpcService(),
    +					highAvailabilityServices,
    +					blobServer,
    +					heartbeatServices,
    +					metricRegistry,
    +					new DispatcherOnCompleteActions(jobGraph.getJobID()),
    +					fatalErrorHandler);
    +
    +				jobManagerRunner.start();
    +			} catch (Exception e) {
    +				try {
    +					// We should only remove a job from the submitted job graph store
    +					// if the initial submission failed. Never in case of a recovery
    +					submittedJobGraphStore.removeJobGraph(jobId);
    +				} catch (Throwable t) {
    +					log.warn("Cannot remove job graph from submitted job graph store.", t);
    +					e.addSuppressed(t);
    +				}
    +
    +				throw new JobSubmissionException(jobId, "Could not start JobManager.", e);
    +			}
    +
    +			jobManagerRunners.put(jobId, jobManagerRunner);
    +
    +			return Acknowledge.get();
    +		} else {
    +			throw new JobSubmissionException(jobId, "Job has already been submitted and " +
    +				"is currently in state " + jobSchedulingStatus + '.');
    +		}
    +	}
    +
    +	@RpcMethod
    +	Collection<JobID> listJobs() {
    +		// TODO: return proper list of running jobs
    +		return Collections.emptyList();
    --- End diff --
    
    could we not already implement this by taking the keyset from the jobManagerRunners map?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r125863501
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmaster.JobManagerRunner;
    +import org.apache.flink.runtime.jobmaster.JobMaster;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcService;
    +
    +/**
    + * Dispatcher implementation which spawns for each submitted {@link JobGraph}
    --- End diff --
    
    personally i would move "a {@link JobMaster}" before "for each submitted".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r125863649
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraph.java ---
    @@ -44,9 +46,9 @@
     	 * @param jobGraph The submitted {@link JobGraph}
     	 * @param jobInfo  The {@link JobInfo}
     	 */
    -	public SubmittedJobGraph(JobGraph jobGraph, JobInfo jobInfo) {
    +	public SubmittedJobGraph(JobGraph jobGraph, @Nullable JobInfo jobInfo) {
     		this.jobGraph = checkNotNull(jobGraph, "Job graph");
    -		this.jobInfo = checkNotNull(jobInfo, "Job info");
    --- End diff --
    
    Is the JobInfo class not applicable to FLIP-6?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4260: [FLINK-7103] [dispatcher] Add skeletal structure o...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4260#discussion_r125863295
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.dispatcher;
    +
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.blob.BlobServer;
    +import org.apache.flink.runtime.blob.BlobService;
    +import org.apache.flink.runtime.client.JobSubmissionException;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.heartbeat.HeartbeatServices;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
    +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
    +import org.apache.flink.runtime.jobmaster.JobManagerRunner;
    +import org.apache.flink.runtime.messages.Acknowledge;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcEndpoint;
    +import org.apache.flink.runtime.rpc.RpcMethod;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.apache.flink.util.FlinkException;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * Base class for the Dispatcher component. The Dispatcher component is responsible
    + * for receiving job submissions, persisting them, spawning JobManagers to execute
    + * the jobs and to recover them in case of a master failure. Furthermore, it knows
    + * about the state of the Flink session cluster.
    + */
    +public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
    +
    +	protected static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
    +
    +	public static final String DISPATCHER_NAME = "dispatcher";
    +
    +	private final SubmittedJobGraphStore submittedJobGraphStore;
    +	private final RunningJobsRegistry runningJobsRegistry;
    +
    +	private final HighAvailabilityServices highAvailabilityServices;
    +	private final BlobServer blobServer;
    +	private final HeartbeatServices heartbeatServices;
    +	private final MetricRegistry metricRegistry;
    +
    +	private final FatalErrorHandler fatalErrorHandler;
    +
    +	private final Map<JobID, JobManagerRunner> jobManagerRunners;
    +
    +	protected Dispatcher(
    +			RpcService rpcService,
    +			String endpointId,
    +			HighAvailabilityServices highAvailabilityServices,
    +			BlobServer blobServer,
    +			HeartbeatServices heartbeatServices,
    +			MetricRegistry metricRegistry,
    +			FatalErrorHandler fatalErrorHandler) throws Exception {
    +		super(rpcService, endpointId);
    +
    +		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
    +		this.blobServer = Preconditions.checkNotNull(blobServer);
    +		this.heartbeatServices = Preconditions.checkNotNull(heartbeatServices);
    +		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
    +		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
    +
    +		this.submittedJobGraphStore = highAvailabilityServices.getSubmittedJobGraphStore();
    +		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
    +
    +		jobManagerRunners = new HashMap<>(16);
    +	}
    +
    +	//------------------------------------------------------
    +	// Lifecycle methods
    +	//------------------------------------------------------
    +
    +	@Override
    +	public void shutDown() throws Exception {
    +		Exception exception = null;
    +		// stop all currently running JobManagerRunners
    +		for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		jobManagerRunners.clear();
    +
    +		try {
    +			submittedJobGraphStore.stop();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		try {
    +			super.shutDown();
    +		} catch (Exception e) {
    +			exception = ExceptionUtils.firstOrSuppressed(e, exception);
    +		}
    +
    +		if (exception != null) {
    +			throw new FlinkException("Could not properly terminate the Dispatcher.", exception);
    +		}
    +	}
    +
    +	//------------------------------------------------------
    +	// RPCs
    +	//------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException {
    +		final JobID jobId = jobGraph.getJobID();
    +
    +		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
    +
    +		final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
    +
    +		try {
    +			jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
    +		} catch (IOException e) {
    +			log.warn("Cannot retrieve job status for {}.", jobId, e);
    +			throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e);
    +		}
    +
    +		if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
    +			try {
    +				submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
    +			} catch (Exception e) {
    +				log.warn("Cannot persist JobGraph.", e);
    +				throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e);
    +			}
    +
    +			final JobManagerRunner jobManagerRunner;
    +
    +			try {
    +				jobManagerRunner = createJobManagerRunner(
    +					ResourceID.generate(),
    +					jobGraph,
    +					null,
    +					getRpcService(),
    +					highAvailabilityServices,
    +					blobServer,
    +					heartbeatServices,
    +					metricRegistry,
    +					new DispatcherOnCompleteActions(jobGraph.getJobID()),
    +					fatalErrorHandler);
    +
    +				jobManagerRunner.start();
    +			} catch (Exception e) {
    +				try {
    +					// We should only remove a job from the submitted job graph store
    +					// if the initial submission failed. Never in case of a recovery
    +					submittedJobGraphStore.removeJobGraph(jobId);
    +				} catch (Throwable t) {
    +					log.warn("Cannot remove job graph from submitted job graph store.", t);
    +					e.addSuppressed(t);
    +				}
    +
    +				throw new JobSubmissionException(jobId, "Could not start JobManager.", e);
    +			}
    +
    +			jobManagerRunners.put(jobId, jobManagerRunner);
    +
    +			return Acknowledge.get();
    +		} else {
    +			throw new JobSubmissionException(jobId, "Job has already been submitted and " +
    +				"is currently in state " + jobSchedulingStatus + '.');
    +		}
    +	}
    +
    +	@RpcMethod
    +	Collection<JobID> listJobs() {
    +		// TODO: return proper list of running jobs
    +		return Collections.emptyList();
    +	}
    +
    +	/**
    +	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
    +	 * the data will also be removed from HA.
    +	 *
    +	 * @param jobId JobID identifying the job to clean up
    +	 * @param cleanupHA True iff HA data shall also be cleaned up
    +	 */
    +	private void removeJob(JobID jobId, boolean cleanupHA) throws Exception {
    +		JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId);
    +
    +		if (jobManagerRunner != null) {
    +			jobManagerRunner.shutdown();
    +		}
    +
    +		if (cleanupHA) {
    +			submittedJobGraphStore.removeJobGraph(jobId);
    +		}
    +
    +		// TODO: remove job related files from blob server
    +	}
    +
    +	protected abstract JobManagerRunner createJobManagerRunner(
    +		ResourceID resourceId,
    +		JobGraph jobGraph,
    +		Configuration configuration,
    +		RpcService rpcService,
    +		HighAvailabilityServices highAvailabilityServices,
    +		BlobService blobService,
    +		HeartbeatServices heartbeatServices,
    +		MetricRegistry metricRegistry,
    +		OnCompletionActions onCompleteActions,
    +		FatalErrorHandler fatalErrorHandler) throws Exception;
    +
    +	//------------------------------------------------------
    +	// Utility classes
    +	//------------------------------------------------------
    +
    +	private class DispatcherOnCompleteActions implements OnCompletionActions {
    +
    +		private final JobID jobId;
    +
    +		private DispatcherOnCompleteActions(JobID jobId) {
    +			this.jobId = Preconditions.checkNotNull(jobId);
    +		}
    +
    +		@Override
    +		public void jobFinished(JobExecutionResult result) {
    +			LOG.info("Job {} finished.", jobId);
    +
    +			runAsync(new Runnable() {
    --- End diff --
    
    We could move this runnable into a separate class or a getter method to reduce duplication.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---