You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 13:11:14 UTC

[flink] 18/19: [FLINK-XXXXX] Deduplicate the Executor code and fix javadocs.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8212ba8d3721f7fc985036473b76bee32eddbdc7
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 10:25:19 2019 +0100

    [FLINK-XXXXX] Deduplicate the Executor code and fix javadocs.
---
 .../deployment/AbstractJobClusterExecutor.java     | 71 ++++++++++++++++++++++
 .../deployment/AbstractSessionClusterExecutor.java | 66 ++++++++++++++++++++
 .../flink/client/deployment/ExecutorUtils.java     | 59 ++++++++++++++++++
 .../StandaloneSessionClusterExecutor.java          | 63 +------------------
 .../org/apache/flink/core/execution/Executor.java  | 13 ++--
 .../yarn/executors/YarnJobClusterExecutor.java     | 68 +--------------------
 .../yarn/executors/YarnSessionClusterExecutor.java | 53 +---------------
 7 files changed, 214 insertions(+), 179 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
new file mode 100644
index 0000000..310dd61
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.executors.JobClientImpl;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on dedicated (per-job) clusters.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
+
+	private final ClientFactory clusterClientFactory;
+
+	public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
+		this.clusterClientFactory = checkNotNull(clusterClientFactory);
+	}
+
+	@Override
+	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+		final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
+
+		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+
+			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
+
+			final ClusterClient<ClusterID> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
+			LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
+			return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
+		}
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
new file mode 100644
index 0000000..2150b18
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on an existing (session) cluster.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor {
+
+	private final ClientFactory clusterClientFactory;
+
+	public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
+		this.clusterClientFactory = checkNotNull(clusterClientFactory);
+	}
+
+	@Override
+	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+		final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
+
+		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+			checkState(clusterID != null);
+
+			// the caller should take care of managing the life-cycle of the return JobClient.
+
+			final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID);
+			return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
+		}
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
new file mode 100644
index 0000000..af540cb
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class with method related to job execution.
+ */
+public class ExecutorUtils {
+
+	/**
+	 * Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}.
+	 *
+	 * @param pipeline the pipeline whose job graph we are computing
+	 * @param configuration the configuration with the necessary information such as jars and
+	 *                         classpaths to be included, the parallelism of the job and potential
+	 *                         savepoint settings used to boostrap its state.
+	 * @return the corresponding {@link JobGraph}.
+	 */
+	public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) {
+		checkNotNull(pipeline);
+		checkNotNull(configuration);
+
+		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+		final JobGraph jobGraph = FlinkPipelineTranslationUtil
+				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
+
+		jobGraph.addJars(executionConfigAccessor.getJars());
+		jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
+		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+		return jobGraph;
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
index df4d2ba..f097323 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -19,77 +19,20 @@
 package org.apache.flink.client.deployment.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
 import org.apache.flink.client.deployment.StandaloneClientFactory;
-import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-
-import java.net.URL;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The {@link Executor} to be used when executing a job on an already running cluster.
  */
 @Internal
-public class StandaloneSessionClusterExecutor implements Executor {
+public class StandaloneSessionClusterExecutor extends AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> {
 
 	public static final String NAME = "standalone-session-cluster";
 
-	private final StandaloneClientFactory clusterClientFactory;
-
 	public StandaloneSessionClusterExecutor() {
-		this.clusterClientFactory = new StandaloneClientFactory();
-	}
-
-	@Override
-	public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception {
-		final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-
-		final List<URL> dependencies = configAccessor.getJars();
-		final List<URL> classpaths = configAccessor.getClasspaths();
-
-		final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
-
-		try (final StandaloneClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
-			final StandaloneClusterId clusterID = clusterClientFactory.getClusterId(configuration);
-			checkState(clusterID != null);
-
-			final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID);
-			return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-		}
-	}
-
-	private JobGraph getJobGraph(
-			final Pipeline pipeline,
-			final Configuration configuration,
-			final List<URL> classpaths,
-			final List<URL> libraries) {
-
-		checkNotNull(pipeline);
-		checkNotNull(configuration);
-		checkNotNull(classpaths);
-		checkNotNull(libraries);
-
-		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-		final JobGraph jobGraph = FlinkPipelineTranslationUtil
-				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
-
-		jobGraph.addJars(libraries);
-		jobGraph.setClasspaths(classpaths);
-		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
-
-		return jobGraph;
+		super(new StandaloneClientFactory());
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 5be3193..b585be6 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.core.execution;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 
+import javax.annotation.Nonnull;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -30,11 +31,15 @@ import java.util.concurrent.CompletableFuture;
 public interface Executor {
 
 	/**
-	 * Executes a {@link Pipeline} based on the provided configuration.
+	 * Executes a {@link Pipeline} based on the provided configuration and returns a {@link JobClient} which allows to
+	 * interact with the job being executed, e.g. cancel it or take a savepoint.
+	 *
+	 * <p><b>ATTENTION:</b> The caller is responsible for managing the lifecycle of the returned {@link JobClient}. This
+	 * means that e.g. {@code close()} should be called explicitly at the call-site.
 	 *
 	 * @param pipeline the {@link Pipeline} to execute
 	 * @param configuration the {@link Configuration} with the required execution parameters
-	 * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
+	 * @return a {@link CompletableFuture} with the {@link JobClient} corresponding to the pipeline.
 	 */
-	CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception;
+	CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
index ead6e9a..084b020 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -19,28 +19,11 @@
 package org.apache.flink.yarn.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.deployment.executors.JobClientImpl;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.client.deployment.AbstractJobClusterExecutor;
 import org.apache.flink.core.execution.Executor;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.yarn.YarnClusterClientFactory;
-import org.apache.flink.yarn.YarnClusterDescriptor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URL;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The {@link Executor} to be used when executing a job in isolation.
@@ -48,56 +31,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * tear it down when the job is finished either successfully or due to an error.
  */
 @Internal
-public class YarnJobClusterExecutor implements Executor {
-
-	private static final Logger LOG = LoggerFactory.getLogger(YarnJobClusterExecutor.class);
+public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
 
 	public static final String NAME = "yarn-job-cluster";
 
-	private final YarnClusterClientFactory clusterClientFactory;
-
 	public YarnJobClusterExecutor() {
-		this.clusterClientFactory = new YarnClusterClientFactory();
-	}
-
-	@Override
-	public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration executionConfig) throws Exception {
-
-		try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) {
-			final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig);
-
-			final List<URL> dependencies = configAccessor.getJars();
-			final List<URL> classpaths = configAccessor.getClasspaths();
-
-			final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, dependencies);
-
-			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig);
-
-			final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
-			LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
-			return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
-		}
-	}
-
-	private JobGraph getJobGraph(
-			final Pipeline pipeline,
-			final Configuration configuration,
-			final List<URL> classpaths,
-			final List<URL> libraries) {
-
-		checkNotNull(pipeline);
-		checkNotNull(configuration);
-		checkNotNull(classpaths);
-		checkNotNull(libraries);
-
-		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-		final JobGraph jobGraph = FlinkPipelineTranslationUtil
-				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
-
-		jobGraph.addJars(libraries);
-		jobGraph.setClasspaths(classpaths);
-		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
-
-		return jobGraph;
+		super(new YarnClusterClientFactory());
 	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index de4d148..873dce4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -19,68 +19,21 @@
 package org.apache.flink.yarn.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
 import org.apache.flink.core.execution.Executor;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.yarn.YarnClusterClientFactory;
-import org.apache.flink.yarn.YarnClusterDescriptor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * The {@link Executor} to be used when executing a job on an already running cluster.
  */
 @Internal
-public class YarnSessionClusterExecutor implements Executor {
+public class YarnSessionClusterExecutor extends AbstractSessionClusterExecutor<ApplicationId, YarnClusterClientFactory> {
 
 	public static final String NAME = "yarn-session-cluster";
 
-	private final YarnClusterClientFactory clusterClientFactory;
-
 	public YarnSessionClusterExecutor() {
-		this.clusterClientFactory = new YarnClusterClientFactory();
-	}
-
-	@Override
-	public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception {
-		final JobGraph jobGraph = getJobGraph(pipeline, configuration);
-
-		try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
-			final ApplicationId clusterID = clusterClientFactory.getClusterId(configuration);
-			checkState(clusterID != null);
-
-			// TODO: 17.11.19 we cannot close the client here because we simply have a future of the client
-			final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID);
-			return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-		}
-	}
-
-	private JobGraph getJobGraph(
-			final Pipeline pipeline,
-			final Configuration configuration) {
-
-		checkNotNull(pipeline);
-		checkNotNull(configuration);
-
-		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-		final JobGraph jobGraph = FlinkPipelineTranslationUtil
-				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
-
-		jobGraph.addJars(executionConfigAccessor.getJars());
-		jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
-		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
-
-		return jobGraph;
+		super(new YarnClusterClientFactory());
 	}
 }