You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/19 04:42:22 UTC

[flink] 11/15: [FLINK-XXXXX] Add the Yarn/Standalone Executors

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc182735564bf6a6a0fdd687ec260783ceeda8e2
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 17:10:15 2019 +0100

    [FLINK-XXXXX] Add the Yarn/Standalone Executors
---
 .../deployment/AbstractJobClusterExecutor.java     | 68 ++++++++++++++++-
 .../deployment/AbstractSessionClusterExecutor.java | 65 ++++++++++++++++-
 .../flink/client/deployment/ExecutorUtils.java     | 55 ++++++++++++++
 .../flink/client/deployment/JobClientImpl.java     | 85 +++++++++++++++++++++-
 .../StandaloneSessionClusterExecutor.java          | 38 ++++++++++
 .../StandaloneSessionClusterExecutorFactory.java   | 45 ++++++++++++
 ...org.apache.flink.core.execution.ExecutorFactory | 16 ++++
 .../yarn/executors/YarnJobClusterExecutor.java     | 41 +++++++++++
 .../executors/YarnJobClusterExecutorFactory.java   | 45 ++++++++++++
 .../yarn/executors/YarnSessionClusterExecutor.java | 39 ++++++++++
 .../YarnSessionClusterExecutorFactory.java         | 45 ++++++++++++
 ...org.apache.flink.core.execution.ExecutorFactory | 17 +++++
 12 files changed, 556 insertions(+), 3 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
index 14a93bc..bca403a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -1,4 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.client.deployment;
 
-public class AbstractJobClusterExecutor {
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on dedicated (per-job) clusters.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
+
+	private final ClientFactory clusterClientFactory;
+
+	public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
+		this.clusterClientFactory = checkNotNull(clusterClientFactory);
+	}
+
+	@Override
+	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+		final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
+
+		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+
+			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
+
+			final ClusterClient<ClusterID> clusterClient = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
+			LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
+			return CompletableFuture.completedFuture(new JobClientImpl<>(clusterClient, jobGraph.getJobID()));
+		}
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
index ab8cc01..413ff58 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
@@ -1,4 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.client.deployment;
 
-public class AbstractSessionClusterExecutor {
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on an existing (session) cluster.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor {
+
+	private final ClientFactory clusterClientFactory;
+
+	public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
+		this.clusterClientFactory = checkNotNull(clusterClientFactory);
+	}
+
+	@Override
+	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+		final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
+
+		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+			checkState(clusterID != null);
+
+			final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID);
+			return clusterClient
+					.submitJob(jobGraph)
+					.thenApply(JobSubmissionResult::getJobID)
+					.thenApply(jobID -> new JobClientImpl<>(clusterClient, jobID));
+		}
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
index e134206..4541b1c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
@@ -1,4 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class with method related to job execution.
+ */
 public class ExecutorUtils {
+
+	/**
+	 * Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}.
+	 *
+	 * @param pipeline the pipeline whose job graph we are computing
+	 * @param configuration the configuration with the necessary information such as jars and
+	 *                         classpaths to be included, the parallelism of the job and potential
+	 *                         savepoint settings used to bootstrap its state.
+	 * @return the corresponding {@link JobGraph}.
+	 */
+	public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) {
+		checkNotNull(pipeline);
+		checkNotNull(configuration);
+
+		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+		final JobGraph jobGraph = FlinkPipelineTranslationUtil
+				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
+
+		jobGraph.addJars(executionConfigAccessor.getJars());
+		jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
+		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+		return jobGraph;
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
index e811fc8..29d8008 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
@@ -1,4 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.client.deployment;
 
-public class JobClientImpl {
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.DetachedJobExecutionResult;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link JobClient} interface.
+ */
+public class JobClientImpl<ClusterID> implements JobClient {
+
+	private final ClusterClient<ClusterID> clusterClient;
+
+	private final JobID jobID;
+
+	public JobClientImpl(final ClusterClient<ClusterID> clusterClient, final JobID jobID) {
+		this.jobID = checkNotNull(jobID);
+		this.clusterClient = checkNotNull(clusterClient);
+	}
+
+	@Override
+	public JobID getJobID() {
+		return jobID;
+	}
+
+	@Override
+	public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+		return CompletableFuture.completedFuture(new DetachedJobExecutionResult(jobID));
+	}
+
+	@Override
+	public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader) {
+		final CompletableFuture<JobExecutionResult> res = new CompletableFuture<>();
+
+		final CompletableFuture<JobResult> jobResultFuture = clusterClient.requestJobResult(jobID);
+		jobResultFuture.whenComplete(((jobResult, throwable) -> {
+			if (throwable != null) {
+				ExceptionUtils.checkInterrupted(throwable);
+				res.completeExceptionally(new ProgramInvocationException("Could not run job", jobID, throwable));
+			} else {
+				try {
+					final JobExecutionResult result = jobResult.toJobExecutionResult(userClassloader);
+					res.complete(result);
+				} catch (JobExecutionException | IOException | ClassNotFoundException e) {
+					res.completeExceptionally(new ProgramInvocationException("Job failed", jobID, e));
+				}
+			}
+		}));
+		return res;
+	}
+
+	@Override
+	public void close() throws Exception {
+		this.clusterClient.close();
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
new file mode 100644
index 0000000..f097323
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
+import org.apache.flink.client.deployment.StandaloneClientFactory;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.core.execution.Executor;
+
+/**
+ * The {@link Executor} to be used when executing a job on an already running cluster.
+ */
+@Internal
+public class StandaloneSessionClusterExecutor extends AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> {
+
+	public static final String NAME = "standalone-session-cluster";
+
+	public StandaloneSessionClusterExecutor() {
+		super(new StandaloneClientFactory());
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
new file mode 100644
index 0000000..43c116e
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
+ */
+@Internal
+public class StandaloneSessionClusterExecutorFactory implements ExecutorFactory {
+
+	@Override
+	public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
+		return configuration.get(DeploymentOptions.TARGET)
+				.equalsIgnoreCase(StandaloneSessionClusterExecutor.NAME);
+	}
+
+	@Override
+	public Executor getExecutor(@Nonnull final Configuration configuration) {
+		return new StandaloneSessionClusterExecutor();
+	}
+}
diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
new file mode 100644
index 0000000..d9b144f
--- /dev/null
+++ b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutorFactory
\ No newline at end of file
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
new file mode 100644
index 0000000..084b020
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.AbstractJobClusterExecutor;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * The {@link Executor} to be used when executing a job in isolation.
+ * This executor will start a cluster specifically for the job at hand and
+ * tear it down when the job is finished either successfully or due to an error.
+ */
+@Internal
+public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
+
+	public static final String NAME = "yarn-job-cluster";
+
+	public YarnJobClusterExecutor() {
+		super(new YarnClusterClientFactory());
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
new file mode 100644
index 0000000..9dc6fd1
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on dedicated (per-job) clusters.
+ */
+@Internal
+public class YarnJobClusterExecutorFactory implements ExecutorFactory {
+
+	@Override
+	public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
+		return configuration.get(DeploymentOptions.TARGET)
+				.equalsIgnoreCase(YarnJobClusterExecutor.NAME);
+	}
+
+	@Override
+	public Executor getExecutor(@Nonnull final Configuration configuration) {
+		return new YarnJobClusterExecutor();
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
new file mode 100644
index 0000000..873dce4
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * The {@link Executor} to be used when executing a job on an already running cluster.
+ */
+@Internal
+public class YarnSessionClusterExecutor extends AbstractSessionClusterExecutor<ApplicationId, YarnClusterClientFactory> {
+
+	public static final String NAME = "yarn-session-cluster";
+
+	public YarnSessionClusterExecutor() {
+		super(new YarnClusterClientFactory());
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
new file mode 100644
index 0000000..101a622
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
+ */
+@Internal
+public class YarnSessionClusterExecutorFactory implements ExecutorFactory {
+
+	@Override
+	public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
+		return configuration.get(DeploymentOptions.TARGET)
+				.equalsIgnoreCase(YarnSessionClusterExecutor.NAME);
+	}
+
+	@Override
+	public Executor getExecutor(@Nonnull final Configuration configuration) {
+		return new YarnSessionClusterExecutor();
+	}
+}
diff --git a/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
new file mode 100644
index 0000000..d56f8c5
--- /dev/null
+++ b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
+org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory
\ No newline at end of file