You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/17 14:45:03 UTC

[flink] 10/14: [FLINK-XXXXX] Add standalone/yarn executors and their factories

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aee9018a21895344936974b2ebdd6262da65ad20
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sat Nov 16 21:32:08 2019 +0100

    [FLINK-XXXXX] Add standalone/yarn executors and their factories
---
 ....java => StandaloneSessionClusterExecutor.java} | 28 +++++++++++----------
 ...> StandaloneSessionClusterExecutorFactory.java} | 21 +++-------------
 ...org.apache.flink.core.execution.ExecutorFactory |  3 +--
 .../yarn/executors/YarnJobClusterExecutor.java     | 28 ++++++++++++---------
 .../executors/YarnJobClusterExecutorFactory.java   | 23 +++--------------
 .../yarn/executors/YarnSessionClusterExecutor.java | 29 ++++++++++++----------
 .../YarnSessionClusterExecutorFactory.java         | 23 +++--------------
 ...org.apache.flink.core.execution.ExecutorFactory |  4 +--
 8 files changed, 61 insertions(+), 98 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
similarity index 74%
copy from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
copy to flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
index b7eeb68..e5cc82e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.client.deployment.executors;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.deployment.StandaloneClientFactory;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.JobClient;
@@ -41,12 +42,15 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * The {@link Executor} to be used when executing a job on an already running cluster.
  */
-public class SessionClusterExecutor<ClusterID> implements Executor {
+@Internal
+public class StandaloneSessionClusterExecutor implements Executor {
 
-	private final ClusterClientServiceLoader clusterClientServiceLoader;
+	public static final String NAME = "standalone-session-cluster";
 
-	public SessionClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
-		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+	private final StandaloneClientFactory clusterClientFactory;
+
+	public StandaloneSessionClusterExecutor() {
+		this.clusterClientFactory = new StandaloneClientFactory();
 	}
 
 	@Override
@@ -58,13 +62,11 @@ public class SessionClusterExecutor<ClusterID> implements Executor {
 
 		final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
 
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
-
-		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
-			final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+		try (final StandaloneClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final StandaloneClusterId clusterID = clusterClientFactory.getClusterId(configuration);
 			checkState(clusterID != null);
 
-			try (final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID)) {
+			try (final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
 				return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
 			}
 		}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
similarity index 62%
copy from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
copy to flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
index 8e5a5eb..06dd451 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
@@ -19,39 +19,24 @@
 package org.apache.flink.client.deployment.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.ClusterMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
  */
 @Internal
-public class SessionClusterExecutorFactory implements ExecutorFactory {
-
-	private final ClusterClientServiceLoader clusterClientServiceLoader;
-
-	public SessionClusterExecutorFactory() {
-		this(new DefaultClusterClientServiceLoader());
-	}
-
-	public SessionClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) {
-		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
-	}
+public class StandaloneSessionClusterExecutorFactory implements ExecutorFactory {
 
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
-		return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.SESSION);
+		return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(StandaloneSessionClusterExecutor.NAME);
 	}
 
 	@Override
 	public Executor getExecutor(Configuration configuration) {
-		return new SessionClusterExecutor<>(clusterClientServiceLoader);
+		return new StandaloneSessionClusterExecutor();
 	}
 }
diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index 870c57d..d9b144f 100644
--- a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,5 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.client.deployment.executors.JobClusterExecutorFactory
-org.apache.flink.client.deployment.executors.SessionClusterExecutorFactory
\ No newline at end of file
+org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutorFactory
\ No newline at end of file
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
similarity index 75%
rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
index 902d7fd..09094e7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -16,21 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.executors.JobClientImpl;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+import org.apache.flink.yarn.YarnClusterDescriptor;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,21 +47,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * This executor will start a cluster specifically for the job at hand and
  * tear it down when the job is finished either successfully or due to an error.
  */
-public class JobClusterExecutor<ClusterID> implements Executor {
+@Internal
+public class YarnJobClusterExecutor implements Executor {
 
-	private static final Logger LOG = LoggerFactory.getLogger(JobClusterExecutor.class);
+	private static final Logger LOG = LoggerFactory.getLogger(YarnJobClusterExecutor.class);
 
-	private final ClusterClientServiceLoader clusterClientServiceLoader;
+	public static final String NAME = "yarn-job-cluster";
 
-	public JobClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
-		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+	private final YarnClusterClientFactory clusterClientFactory;
+
+	public YarnJobClusterExecutor() {
+		this.clusterClientFactory = new YarnClusterClientFactory();
 	}
 
 	@Override
 	public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration executionConfig) throws Exception {
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executionConfig);
 
-		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) {
+		try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) {
 			final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig);
 
 			final List<URL> dependencies = configAccessor.getJars();
@@ -69,7 +73,7 @@ public class JobClusterExecutor<ClusterID> implements Executor {
 
 			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig);
 
-			try (final ClusterClient<ClusterID> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
+			try (final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
 				LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
 				return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
 			}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
similarity index 60%
rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java
rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
index 5e10984..408a819 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
@@ -16,42 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.ClusterMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * An {@link ExecutorFactory} for executing jobs on dedicated (per-job) clusters.
  */
 @Internal
-public class JobClusterExecutorFactory implements ExecutorFactory {
-
-	private final ClusterClientServiceLoader clusterClientServiceLoader;
-
-	public JobClusterExecutorFactory() {
-		this(new DefaultClusterClientServiceLoader());
-	}
-
-	public JobClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) {
-		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
-	}
+public class YarnJobClusterExecutorFactory implements ExecutorFactory {
 
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
-		return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.PER_JOB);
+		return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(YarnJobClusterExecutor.NAME);
 	}
 
 	@Override
 	public Executor getExecutor(Configuration configuration) {
-		return new JobClusterExecutor<>(clusterClientServiceLoader);
+		return new YarnJobClusterExecutor();
 	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
similarity index 74%
rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index b7eeb68..dd15d1b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -16,20 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 import java.net.URL;
 import java.util.List;
@@ -41,12 +43,15 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * The {@link Executor} to be used when executing a job on an already running cluster.
  */
-public class SessionClusterExecutor<ClusterID> implements Executor {
+@Internal
+public class YarnSessionClusterExecutor implements Executor {
+
+	public static final String NAME = "yarn-session-cluster";
 
-	private final ClusterClientServiceLoader clusterClientServiceLoader;
+	private final YarnClusterClientFactory clusterClientFactory;
 
-	public SessionClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
-		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+	public YarnSessionClusterExecutor() {
+		this.clusterClientFactory = new YarnClusterClientFactory();
 	}
 
 	@Override
@@ -58,13 +63,11 @@ public class SessionClusterExecutor<ClusterID> implements Executor {
 
 		final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
 
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
-
-		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
-			final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+		try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final ApplicationId clusterID = clusterClientFactory.getClusterId(configuration);
 			checkState(clusterID != null);
 
-			try (final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID)) {
+			try (final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
 				return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
 			}
 		}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
similarity index 59%
rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
index 8e5a5eb..5b6b847 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
@@ -16,42 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.ClusterMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
  */
 @Internal
-public class SessionClusterExecutorFactory implements ExecutorFactory {
-
-	private final ClusterClientServiceLoader clusterClientServiceLoader;
-
-	public SessionClusterExecutorFactory() {
-		this(new DefaultClusterClientServiceLoader());
-	}
-
-	public SessionClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) {
-		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
-	}
+public class YarnSessionClusterExecutorFactory implements ExecutorFactory {
 
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
-		return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.SESSION);
+		return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(YarnSessionClusterExecutor.NAME);
 	}
 
 	@Override
 	public Executor getExecutor(Configuration configuration) {
-		return new SessionClusterExecutor<>(clusterClientServiceLoader);
+		return new YarnSessionClusterExecutor();
 	}
 }
diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
similarity index 84%
copy from flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
copy to flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index 870c57d..d56f8c5 100644
--- a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,5 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.client.deployment.executors.JobClusterExecutorFactory
-org.apache.flink.client.deployment.executors.SessionClusterExecutorFactory
\ No newline at end of file
+org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
+org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory
\ No newline at end of file