You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/19 04:42:22 UTC
[flink] 11/15: [FLINK-XXXXX] Add the Yarn/Standalone Executors
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc182735564bf6a6a0fdd687ec260783ceeda8e2
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 17:10:15 2019 +0100
[FLINK-XXXXX] Add the Yarn/Standalone Executors
---
.../deployment/AbstractJobClusterExecutor.java | 68 ++++++++++++++++-
.../deployment/AbstractSessionClusterExecutor.java | 65 ++++++++++++++++-
.../flink/client/deployment/ExecutorUtils.java | 55 ++++++++++++++
.../flink/client/deployment/JobClientImpl.java | 85 +++++++++++++++++++++-
.../StandaloneSessionClusterExecutor.java | 38 ++++++++++
.../StandaloneSessionClusterExecutorFactory.java | 45 ++++++++++++
...org.apache.flink.core.execution.ExecutorFactory | 16 ++++
.../yarn/executors/YarnJobClusterExecutor.java | 41 +++++++++++
.../executors/YarnJobClusterExecutorFactory.java | 45 ++++++++++++
.../yarn/executors/YarnSessionClusterExecutor.java | 39 ++++++++++
.../YarnSessionClusterExecutorFactory.java | 45 ++++++++++++
...org.apache.flink.core.execution.ExecutorFactory | 17 +++++
12 files changed, 556 insertions(+), 3 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
index 14a93bc..bca403a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -1,4 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.flink.client.deployment;
-public class AbstractJobClusterExecutor {
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on dedicated (per-job) clusters.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
+
+ private final ClientFactory clusterClientFactory;
+
+ public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
+ this.clusterClientFactory = checkNotNull(clusterClientFactory);
+ }
+
+ @Override
+ public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+ final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
+
+ try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+ final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+
+ final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
+
+ final ClusterClient<ClusterID> clusterClient = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
+ LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
+ return CompletableFuture.completedFuture(new JobClientImpl<>(clusterClient, jobGraph.getJobID()));
+ }
+ }
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
index ab8cc01..413ff58 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
@@ -1,4 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.flink.client.deployment;
-public class AbstractSessionClusterExecutor {
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on an existing (session) cluster.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor {
+
+ private final ClientFactory clusterClientFactory;
+
+ public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
+ this.clusterClientFactory = checkNotNull(clusterClientFactory);
+ }
+
+ @Override
+ public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+ final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
+
+ try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+ final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+ checkState(clusterID != null);
+
+ final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID);
+ return clusterClient
+ .submitJob(jobGraph)
+ .thenApply(JobSubmissionResult::getJobID)
+ .thenApply(jobID -> new JobClientImpl<>(clusterClient, jobID));
+ }
+ }
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
index e134206..4541b1c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
@@ -1,4 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.flink.client.deployment;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class with method related to job execution.
+ */
public class ExecutorUtils {
+
+ /**
+ * Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}.
+ *
+ * @param pipeline the pipeline whose job graph we are computing
+ * @param configuration the configuration with the necessary information such as jars and
+ * classpaths to be included, the parallelism of the job and potential
+ * savepoint settings used to bootstrap its state.
+ * @return the corresponding {@link JobGraph}.
+ */
+ public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) {
+ checkNotNull(pipeline);
+ checkNotNull(configuration);
+
+ final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+ final JobGraph jobGraph = FlinkPipelineTranslationUtil
+ .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
+
+ jobGraph.addJars(executionConfigAccessor.getJars());
+ jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
+ jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+ return jobGraph;
+ }
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
index e811fc8..29d8008 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
@@ -1,4 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.flink.client.deployment;
-public class JobClientImpl {
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.DetachedJobExecutionResult;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link JobClient} interface.
+ */
+public class JobClientImpl<ClusterID> implements JobClient {
+
+ private final ClusterClient<ClusterID> clusterClient;
+
+ private final JobID jobID;
+
+ public JobClientImpl(final ClusterClient<ClusterID> clusterClient, final JobID jobID) {
+ this.jobID = checkNotNull(jobID);
+ this.clusterClient = checkNotNull(clusterClient);
+ }
+
+ @Override
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ @Override
+ public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+ return CompletableFuture.completedFuture(new DetachedJobExecutionResult(jobID));
+ }
+
+ @Override
+ public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader) {
+ final CompletableFuture<JobExecutionResult> res = new CompletableFuture<>();
+
+ final CompletableFuture<JobResult> jobResultFuture = clusterClient.requestJobResult(jobID);
+ jobResultFuture.whenComplete(((jobResult, throwable) -> {
+ if (throwable != null) {
+ ExceptionUtils.checkInterrupted(throwable);
+ res.completeExceptionally(new ProgramInvocationException("Could not run job", jobID, throwable));
+ } else {
+ try {
+ final JobExecutionResult result = jobResult.toJobExecutionResult(userClassloader);
+ res.complete(result);
+ } catch (JobExecutionException | IOException | ClassNotFoundException e) {
+ res.completeExceptionally(new ProgramInvocationException("Job failed", jobID, e));
+ }
+ }
+ }));
+ return res;
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.clusterClient.close();
+ }
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
new file mode 100644
index 0000000..f097323
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
+import org.apache.flink.client.deployment.StandaloneClientFactory;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.core.execution.Executor;
+
+/**
+ * The {@link Executor} to be used when executing a job on an already running cluster.
+ */
+@Internal
+public class StandaloneSessionClusterExecutor extends AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> {
+
+ public static final String NAME = "standalone-session-cluster";
+
+ public StandaloneSessionClusterExecutor() {
+ super(new StandaloneClientFactory());
+ }
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
new file mode 100644
index 0000000..43c116e
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
+ */
+@Internal
+public class StandaloneSessionClusterExecutorFactory implements ExecutorFactory {
+
+ @Override
+ public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
+ return configuration.get(DeploymentOptions.TARGET)
+ .equalsIgnoreCase(StandaloneSessionClusterExecutor.NAME);
+ }
+
+ @Override
+ public Executor getExecutor(@Nonnull final Configuration configuration) {
+ return new StandaloneSessionClusterExecutor();
+ }
+}
diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
new file mode 100644
index 0000000..d9b144f
--- /dev/null
+++ b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutorFactory
\ No newline at end of file
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
new file mode 100644
index 0000000..084b020
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.AbstractJobClusterExecutor;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * The {@link Executor} to be used when executing a job in isolation.
+ * This executor will start a cluster specifically for the job at hand and
+ * tear it down when the job is finished either successfully or due to an error.
+ */
+@Internal
+public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
+
+ public static final String NAME = "yarn-job-cluster";
+
+ public YarnJobClusterExecutor() {
+ super(new YarnClusterClientFactory());
+ }
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
new file mode 100644
index 0000000..9dc6fd1
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on dedicated (per-job) clusters.
+ */
+@Internal
+public class YarnJobClusterExecutorFactory implements ExecutorFactory {
+
+ @Override
+ public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
+ return configuration.get(DeploymentOptions.TARGET)
+ .equalsIgnoreCase(YarnJobClusterExecutor.NAME);
+ }
+
+ @Override
+ public Executor getExecutor(@Nonnull final Configuration configuration) {
+ return new YarnJobClusterExecutor();
+ }
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
new file mode 100644
index 0000000..873dce4
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * The {@link Executor} to be used when executing a job on an already running cluster.
+ */
+@Internal
+public class YarnSessionClusterExecutor extends AbstractSessionClusterExecutor<ApplicationId, YarnClusterClientFactory> {
+
+ public static final String NAME = "yarn-session-cluster";
+
+ public YarnSessionClusterExecutor() {
+ super(new YarnClusterClientFactory());
+ }
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
new file mode 100644
index 0000000..101a622
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
+ */
+@Internal
+public class YarnSessionClusterExecutorFactory implements ExecutorFactory {
+
+ @Override
+ public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
+ return configuration.get(DeploymentOptions.TARGET)
+ .equalsIgnoreCase(YarnSessionClusterExecutor.NAME);
+ }
+
+ @Override
+ public Executor getExecutor(@Nonnull final Configuration configuration) {
+ return new YarnSessionClusterExecutor();
+ }
+}
diff --git a/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
new file mode 100644
index 0000000..d56f8c5
--- /dev/null
+++ b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
+org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory
\ No newline at end of file