You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/17 14:45:03 UTC
[flink] 10/14: [FLINK-XXXXX] Add standalone/yarn executors and
their factories
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git
commit aee9018a21895344936974b2ebdd6262da65ad20
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sat Nov 16 21:32:08 2019 +0100
[FLINK-XXXXX] Add standalone/yarn executors and their factories
---
....java => StandaloneSessionClusterExecutor.java} | 28 +++++++++++----------
...> StandaloneSessionClusterExecutorFactory.java} | 21 +++-------------
...org.apache.flink.core.execution.ExecutorFactory | 3 +--
.../yarn/executors/YarnJobClusterExecutor.java | 28 ++++++++++++---------
.../executors/YarnJobClusterExecutorFactory.java | 23 +++--------------
.../yarn/executors/YarnSessionClusterExecutor.java | 29 ++++++++++++----------
.../YarnSessionClusterExecutorFactory.java | 23 +++--------------
...org.apache.flink.core.execution.ExecutorFactory | 4 +--
8 files changed, 61 insertions(+), 98 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
similarity index 74%
copy from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
copy to flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
index b7eeb68..e5cc82e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -18,14 +18,15 @@
package org.apache.flink.client.deployment.executors;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.deployment.StandaloneClientFactory;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.JobClient;
@@ -41,12 +42,15 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* The {@link Executor} to be used when executing a job on an already running cluster.
*/
-public class SessionClusterExecutor<ClusterID> implements Executor {
+@Internal
+public class StandaloneSessionClusterExecutor implements Executor {
- private final ClusterClientServiceLoader clusterClientServiceLoader;
+ public static final String NAME = "standalone-session-cluster";
- public SessionClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
- this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+ private final StandaloneClientFactory clusterClientFactory;
+
+ public StandaloneSessionClusterExecutor() {
+ this.clusterClientFactory = new StandaloneClientFactory();
}
@Override
@@ -58,13 +62,11 @@ public class SessionClusterExecutor<ClusterID> implements Executor {
final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
- final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
-
- try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
- final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+ try (final StandaloneClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+ final StandaloneClusterId clusterID = clusterClientFactory.getClusterId(configuration);
checkState(clusterID != null);
- try (final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID)) {
+ try (final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
}
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
similarity index 62%
copy from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
copy to flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
index 8e5a5eb..06dd451 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
@@ -19,39 +19,24 @@
package org.apache.flink.client.deployment.executors;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.ClusterMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
*/
@Internal
-public class SessionClusterExecutorFactory implements ExecutorFactory {
-
- private final ClusterClientServiceLoader clusterClientServiceLoader;
-
- public SessionClusterExecutorFactory() {
- this(new DefaultClusterClientServiceLoader());
- }
-
- public SessionClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) {
- this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
- }
+public class StandaloneSessionClusterExecutorFactory implements ExecutorFactory {
@Override
public boolean isCompatibleWith(Configuration configuration) {
- return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.SESSION);
+ return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(StandaloneSessionClusterExecutor.NAME);
}
@Override
public Executor getExecutor(Configuration configuration) {
- return new SessionClusterExecutor<>(clusterClientServiceLoader);
+ return new StandaloneSessionClusterExecutor();
}
}
diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index 870c57d..d9b144f 100644
--- a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,5 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.client.deployment.executors.JobClusterExecutorFactory
-org.apache.flink.client.deployment.executors.SessionClusterExecutorFactory
\ No newline at end of file
+org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutorFactory
\ No newline at end of file
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
similarity index 75%
rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
index 902d7fd..09094e7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -16,21 +16,23 @@
* limitations under the License.
*/
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.executors.JobClientImpl;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,21 +47,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* This executor will start a cluster specifically for the job at hand and
* tear it down when the job is finished either successfully or due to an error.
*/
-public class JobClusterExecutor<ClusterID> implements Executor {
+@Internal
+public class YarnJobClusterExecutor implements Executor {
- private static final Logger LOG = LoggerFactory.getLogger(JobClusterExecutor.class);
+ private static final Logger LOG = LoggerFactory.getLogger(YarnJobClusterExecutor.class);
- private final ClusterClientServiceLoader clusterClientServiceLoader;
+ public static final String NAME = "yarn-job-cluster";
- public JobClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
- this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+ private final YarnClusterClientFactory clusterClientFactory;
+
+ public YarnJobClusterExecutor() {
+ this.clusterClientFactory = new YarnClusterClientFactory();
}
@Override
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration executionConfig) throws Exception {
- final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executionConfig);
- try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) {
+ try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) {
final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig);
final List<URL> dependencies = configAccessor.getJars();
@@ -69,7 +73,7 @@ public class JobClusterExecutor<ClusterID> implements Executor {
final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig);
- try (final ClusterClient<ClusterID> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
+ try (final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
similarity index 60%
rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java
rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
index 5e10984..408a819 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
@@ -16,42 +16,27 @@
* limitations under the License.
*/
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.ClusterMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* An {@link ExecutorFactory} for executing jobs on dedicated (per-job) clusters.
*/
@Internal
-public class JobClusterExecutorFactory implements ExecutorFactory {
-
- private final ClusterClientServiceLoader clusterClientServiceLoader;
-
- public JobClusterExecutorFactory() {
- this(new DefaultClusterClientServiceLoader());
- }
-
- public JobClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) {
- this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
- }
+public class YarnJobClusterExecutorFactory implements ExecutorFactory {
@Override
public boolean isCompatibleWith(Configuration configuration) {
- return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.PER_JOB);
+ return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(YarnJobClusterExecutor.NAME);
}
@Override
public Executor getExecutor(Configuration configuration) {
- return new JobClusterExecutor<>(clusterClientServiceLoader);
+ return new YarnJobClusterExecutor();
}
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
similarity index 74%
rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index b7eeb68..dd15d1b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -16,20 +16,22 @@
* limitations under the License.
*/
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import java.net.URL;
import java.util.List;
@@ -41,12 +43,15 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* The {@link Executor} to be used when executing a job on an already running cluster.
*/
-public class SessionClusterExecutor<ClusterID> implements Executor {
+@Internal
+public class YarnSessionClusterExecutor implements Executor {
+
+ public static final String NAME = "yarn-session-cluster";
- private final ClusterClientServiceLoader clusterClientServiceLoader;
+ private final YarnClusterClientFactory clusterClientFactory;
- public SessionClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
- this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+ public YarnSessionClusterExecutor() {
+ this.clusterClientFactory = new YarnClusterClientFactory();
}
@Override
@@ -58,13 +63,11 @@ public class SessionClusterExecutor<ClusterID> implements Executor {
final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
- final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
-
- try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
- final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+ try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+ final ApplicationId clusterID = clusterClientFactory.getClusterId(configuration);
checkState(clusterID != null);
- try (final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID)) {
+ try (final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
}
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
similarity index 59%
rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
index 8e5a5eb..5b6b847 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
@@ -16,42 +16,27 @@
* limitations under the License.
*/
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.ClusterMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
*/
@Internal
-public class SessionClusterExecutorFactory implements ExecutorFactory {
-
- private final ClusterClientServiceLoader clusterClientServiceLoader;
-
- public SessionClusterExecutorFactory() {
- this(new DefaultClusterClientServiceLoader());
- }
-
- public SessionClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) {
- this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
- }
+public class YarnSessionClusterExecutorFactory implements ExecutorFactory {
@Override
public boolean isCompatibleWith(Configuration configuration) {
- return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.SESSION);
+ return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(YarnSessionClusterExecutor.NAME);
}
@Override
public Executor getExecutor(Configuration configuration) {
- return new SessionClusterExecutor<>(clusterClientServiceLoader);
+ return new YarnSessionClusterExecutor();
}
}
diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
similarity index 84%
copy from flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
copy to flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index 870c57d..d56f8c5 100644
--- a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,5 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.client.deployment.executors.JobClusterExecutorFactory
-org.apache.flink.client.deployment.executors.SessionClusterExecutorFactory
\ No newline at end of file
+org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
+org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory
\ No newline at end of file