You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 13:10:56 UTC

[flink] branch executor-impl created (now d29867f)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git.


      at d29867f  [FLINK-XXXXX] Make DefaultExecutorServiceLoader a singleton.

This branch includes the following new commits:

     new 3ca53c6  [hotfix] Fix parallelism consolidation logic in the (Stream)ExecutionEnvironment.
     new accfbf5  [hotfix] ignore empty yarn properties
     new c058f2f  [hotfix] minor checkstyle fixes
     new c3141f2  [hotfix] Merge configurations in arguemnt list of runProgram() in CLIFrontend
     new 325b49f  [hotfix] Simplified the construction of the ContextEnvironment
     new 0109616  [FLINK-XXXXX] Add methods to ClientUtils that do not require userClassloader
     new 7528d5c  [hotfix] code style fix
     new 54a277b  [FLINK-XXXXX] Change Executor.execute() signature + add Session and Job Cluster Executors
     new f1c3690  [FLINK-XXXXX] Add the ExecutorFactories for Session and Per-Job
     new e416017  [hotfix] Fix isCompatibleWith of ClusterClientFactories to ignore the case.
     new f46bb96  [FLINK-XXXXX] Add standalone/yarn executors and their factories
     new c08abac  [FLINK-XXXXX] Update the DeploymentTarger setting
     new b771f25  Update ContextEnvironments + Deactivated test!!! TO RE-ACTIVATE
     new df355d9  Wired verything together
     new 8e74ecd  [FLINK-XXXXX] Fix job client lifecycle issue
     new 75860ea  removed notifications
     new 3330a39  [hotfix] Annotate ClusterClientFactories as @Internal
     new 8212ba8  [FLINK-XXXXX] Deduplicate the Executor code and fix javadocs.
     new d29867f  [FLINK-XXXXX] Make DefaultExecutorServiceLoader a singleton.

The 19 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[flink] 12/19: [FLINK-XXXXX] Update the DeploymentTarger setting

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c08abac7556e7e63b092dd37d46c9e73df2697cf
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sat Nov 16 22:24:26 2019 +0100

    [FLINK-XXXXX] Update the DeploymentTarger setting
---
 .../flink/client/cli/AbstractCustomCommandLine.java       |  3 ++-
 .../flink/client/deployment/StandaloneClientFactory.java  |  3 ++-
 .../client/deployment/ClusterClientServiceLoaderTest.java |  3 ++-
 .../org/apache/flink/yarn/YarnClusterClientFactory.java   |  6 +++++-
 .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java    |  6 +++++-
 .../apache/flink/yarn/YarnClusterClientFactoryTest.java   | 15 +++++++++++++--
 6 files changed, 29 insertions(+), 7 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
index f32d4f8..b8431cf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.cli;
 
+import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -73,7 +74,7 @@ public abstract class AbstractCustomCommandLine implements CustomCommandLine {
 	@Override
 	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
 		final Configuration resultingConfiguration = new Configuration(configuration);
-		resultingConfiguration.setString(DeploymentOptions.TARGET, getId());
+		resultingConfiguration.setString(DeploymentOptions.TARGET, StandaloneSessionClusterExecutor.NAME);
 
 		if (commandLine.hasOption(addressOption.getOpt())) {
 			String addressWithPort = commandLine.getOptionValue(addressOption.getOpt());
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
index b10204b..9597ff8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 
@@ -35,7 +36,7 @@ public class StandaloneClientFactory implements ClusterClientFactory<StandaloneC
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
 		checkNotNull(configuration);
-		return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
+		return StandaloneSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
 	}
 
 	@Override
diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
index a084021..b7a9953 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 
@@ -57,7 +58,7 @@ public class ClusterClientServiceLoaderTest {
 	@Test
 	public void testStandaloneClusterClientFactoryDiscovery() {
 		final Configuration config = new Configuration();
-		config.setString(DeploymentOptions.TARGET, StandaloneClientFactory.ID);
+		config.setString(DeploymentOptions.TARGET, StandaloneSessionClusterExecutor.NAME);
 
 		ClusterClientFactory<StandaloneClusterId> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
 		assertTrue(factory instanceof StandaloneClientFactory);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index 7605470..ad391d8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -45,7 +47,9 @@ public class YarnClusterClientFactory implements ClusterClientFactory<Applicatio
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
 		checkNotNull(configuration);
-		return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
+		final String deploymentTarget = configuration.getString(DeploymentOptions.TARGET);
+		return YarnJobClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget) ||
+				YarnSessionClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget);
 	}
 
 	@Override
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 362deda..388dea0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -47,6 +47,8 @@ import org.apache.flink.yarn.YarnClusterClientFactory;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -346,7 +348,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
 		// we ignore the addressOption because it can only contain "yarn-cluster"
 		final Configuration effectiveConfiguration = new Configuration(configuration);
-		effectiveConfiguration.setString(DeploymentOptions.TARGET, getId());
 
 		applyDescriptorOptionToConfig(commandLine, effectiveConfiguration);
 
@@ -361,6 +362,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 
 			effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);
 			effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(applicationId));
+			effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);
+		} else {
+			effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
 		}
 
 		if (commandLine.hasOption(jmMemory.getOpt())) {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
index 931313a..508c11e 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.junit.Test;
@@ -35,9 +37,18 @@ import static org.junit.Assert.assertTrue;
 public class YarnClusterClientFactoryTest {
 
 	@Test
-	public void testYarnClusterClientFactoryDiscovery() {
+	public void testYarnClusterClientFactoryDiscoveryWithPerJobExecutor() {
+		testYarnClusterClientFactoryDiscoveryHelper(YarnJobClusterExecutor.NAME);
+	}
+
+	@Test
+	public void testYarnClusterClientFactoryDiscoveryWithSessionExecutor() {
+		testYarnClusterClientFactoryDiscoveryHelper(YarnSessionClusterExecutor.NAME);
+	}
+
+	private void testYarnClusterClientFactoryDiscoveryHelper(final String targetName) {
 		final Configuration configuration = new Configuration();
-		configuration.setString(DeploymentOptions.TARGET, YarnClusterClientFactory.ID);
+		configuration.setString(DeploymentOptions.TARGET, targetName);
 
 		final ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
 		final ClusterClientFactory<ApplicationId> factory = serviceLoader.getClusterClientFactory(configuration);


[flink] 10/19: [hotfix] Fix isCompatibleWith of ClusterClientFactories to ignore the case.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e416017231026e804978b40fd9fe4c485d559176
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sat Nov 16 20:59:59 2019 +0100

    [hotfix] Fix isCompatibleWith of ClusterClientFactories to ignore the case.
---
 .../org/apache/flink/client/deployment/StandaloneClientFactory.java     | 2 +-
 .../src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
index b441a63..b10204b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
@@ -35,7 +35,7 @@ public class StandaloneClientFactory implements ClusterClientFactory<StandaloneC
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
 		checkNotNull(configuration);
-		return ID.equals(configuration.getString(DeploymentOptions.TARGET));
+		return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
 	}
 
 	@Override
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index aa138a7..7605470 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -45,7 +45,7 @@ public class YarnClusterClientFactory implements ClusterClientFactory<Applicatio
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
 		checkNotNull(configuration);
-		return ID.equals(configuration.getString(DeploymentOptions.TARGET));
+		return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
 	}
 
 	@Override


[flink] 09/19: [FLINK-XXXXX] Add the ExecutorFactories for Session and Per-Job

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f1c36904bc52579ec50877717f8a98addc87c136
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Nov 5 13:31:56 2019 +0100

    [FLINK-XXXXX] Add the ExecutorFactories for Session and Per-Job
---
 .../executors/JobClusterExecutorFactory.java       | 57 ++++++++++++++++++++++
 .../executors/SessionClusterExecutorFactory.java   | 57 ++++++++++++++++++++++
 ...org.apache.flink.core.execution.ExecutorFactory | 17 +++++++
 3 files changed, 131 insertions(+)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java
new file mode 100644
index 0000000..5e10984
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.configuration.ClusterMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on dedicated (per-job) clusters.
+ */
+@Internal
+public class JobClusterExecutorFactory implements ExecutorFactory {
+
+	private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+	public JobClusterExecutorFactory() {
+		this(new DefaultClusterClientServiceLoader());
+	}
+
+	public JobClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) {
+		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+	}
+
+	@Override
+	public boolean isCompatibleWith(Configuration configuration) {
+		return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.PER_JOB);
+	}
+
+	@Override
+	public Executor getExecutor(Configuration configuration) {
+		return new JobClusterExecutor<>(clusterClientServiceLoader);
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
new file mode 100644
index 0000000..8e5a5eb
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.configuration.ClusterMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
+ */
+@Internal
+public class SessionClusterExecutorFactory implements ExecutorFactory {
+
+	private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+	public SessionClusterExecutorFactory() {
+		this(new DefaultClusterClientServiceLoader());
+	}
+
+	public SessionClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) {
+		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+	}
+
+	@Override
+	public boolean isCompatibleWith(Configuration configuration) {
+		return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.SESSION);
+	}
+
+	@Override
+	public Executor getExecutor(Configuration configuration) {
+		return new SessionClusterExecutor<>(clusterClientServiceLoader);
+	}
+}
diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
new file mode 100644
index 0000000..870c57d
--- /dev/null
+++ b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.client.deployment.executors.JobClusterExecutorFactory
+org.apache.flink.client.deployment.executors.SessionClusterExecutorFactory
\ No newline at end of file


[flink] 16/19: removed notifications

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 75860ea1b0b868b1e29de79fd1655363ed7d6473
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 15:44:26 2019 +0100

    removed notifications
---
 .travis.yml | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/.travis.yml b/.travis.yml
index 65cec31..e6ffbb5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -63,6 +63,9 @@ notifications:
     rooms:
       - secure: ikPQn5JTpkyzxVyOPm/jIl3FPm6hY8xAdG4pSwxGWjBqF+NmmNTp9YZsJ6fD8xPql6T5n1hNDbZSC14jVUw/vvXGvibDXLN+06f25ZQl+4LJBXaiR7gTG6y3nO8G90Vw7XpvCme6n5Md9tvjygb17a4FEgRJFfwzWnnyPA1yvK0=
     on_success: never
+    on_failure: never
+    on_cancel: never
+    on_error: never
     on_pull_requests: false
   webhooks:
     urls:


[flink] 05/19: [hotfix] Simplified the construction of the ContextEnvironment

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 325b49fd368adb63097b165782d07488be705103
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 14 16:46:35 2019 +0100

    [hotfix] Simplified the construction of the ContextEnvironment
---
 .../java/org/apache/flink/client/ClientUtils.java  | 18 ++-----
 .../flink/client/program/ContextEnvironment.java   | 42 ++++++++++------
 .../client/program/ContextEnvironmentFactory.java  | 58 +++++++---------------
 3 files changed, 49 insertions(+), 69 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 5e53bc3..2c80236 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -33,7 +33,6 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -154,9 +153,6 @@ public enum ClientUtils {
 
 		final List<URL> jobJars = executionConfigAccessor.getJars();
 		final List<URL> classpaths = executionConfigAccessor.getClasspaths();
-		final SavepointRestoreSettings savepointSettings = executionConfigAccessor.getSavepointRestoreSettings();
-		final int parallelism = executionConfigAccessor.getParallelism();
-		final boolean detached = executionConfigAccessor.getDetachedMode();
 
 		final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
 
@@ -164,19 +160,15 @@ public enum ClientUtils {
 		try {
 			Thread.currentThread().setContextClassLoader(userCodeClassLoader);
 
-			LOG.info("Starting program (detached: {})", detached);
+			LOG.info("Starting program (detached: {})", executionConfigAccessor.getDetachedMode());
 
 			final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>();
 
 			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
-				client,
-				jobJars,
-				classpaths,
-				userCodeClassLoader,
-				parallelism,
-				detached,
-				savepointSettings,
-				jobExecutionResult);
+					configuration,
+					client,
+					userCodeClassLoader,
+					jobExecutionResult);
 			ContextEnvironment.setAsContext(factory);
 
 			try {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 08a02af..9d3927a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
@@ -32,6 +34,8 @@ import java.net.URL;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Execution Environment for remote execution with the Client.
  */
@@ -54,23 +58,29 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	private boolean alreadyCalled;
 
 	public ContextEnvironment(
-		ClusterClient<?> remoteConnection,
-		List<URL> jarFiles,
-		List<URL> classpaths,
-		ClassLoader userCodeClassLoader,
-		SavepointRestoreSettings savepointSettings,
-		boolean detached,
-		AtomicReference<JobExecutionResult> jobExecutionResult) {
-		this.client = remoteConnection;
-		this.jarFilesToAttach = jarFiles;
-		this.classpathsToAttach = classpaths;
-		this.userCodeClassLoader = userCodeClassLoader;
-		this.savepointSettings = savepointSettings;
-
-		this.detached = detached;
-		this.alreadyCalled = false;
+			final Configuration configuration,
+			final ClusterClient<?> remoteConnection,
+			final ClassLoader userCodeClassLoader,
+			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+
+		final ExecutionConfigAccessor accessor = ExecutionConfigAccessor
+				.fromConfiguration(checkNotNull(configuration));
+
+		this.jarFilesToAttach = accessor.getJars();
+		this.classpathsToAttach = accessor.getClasspaths();
+		this.savepointSettings = accessor.getSavepointRestoreSettings();
+		this.detached = accessor.getDetachedMode();
+
+		final int parallelism = accessor.getParallelism();
+		if (parallelism > 0) {
+			setParallelism(parallelism);
+		}
 
-		this.jobExecutionResult = jobExecutionResult;
+		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+		this.jobExecutionResult = checkNotNull(jobExecutionResult);
+		this.client = checkNotNull(remoteConnection);
+
+		this.alreadyCalled = false;
 	}
 
 	@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index ff7f15b..f1c9ad6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -22,12 +22,13 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The factory that instantiates the environment to be used when running jobs that are
  * submitted through a pre-configured client connection.
@@ -35,64 +36,41 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-	private final ClusterClient<?> client;
-
-	private final List<URL> jarFilesToAttach;
+	private final Configuration configuration;
 
-	private final List<URL> classpathsToAttach;
+	private final ClusterClient<?> client;
 
 	private final ClassLoader userCodeClassLoader;
 
-	private final int defaultParallelism;
-
-	private final boolean isDetached;
-
-	private final SavepointRestoreSettings savepointSettings;
-
 	private final AtomicReference<JobExecutionResult> jobExecutionResult;
 
 	private boolean alreadyCalled;
 
 	public ContextEnvironmentFactory(
-		ClusterClient<?> client,
-		List<URL> jarFilesToAttach,
-		List<URL> classpathsToAttach,
-		ClassLoader userCodeClassLoader,
-		int defaultParallelism,
-		boolean isDetached,
-		SavepointRestoreSettings savepointSettings,
-		AtomicReference<JobExecutionResult> jobExecutionResult) {
-		this.client = client;
-		this.jarFilesToAttach = jarFilesToAttach;
-		this.classpathsToAttach = classpathsToAttach;
-		this.userCodeClassLoader = userCodeClassLoader;
-		this.defaultParallelism = defaultParallelism;
-		this.isDetached = isDetached;
-		this.savepointSettings = savepointSettings;
+			final Configuration configuration,
+			final ClusterClient<?> client,
+			final ClassLoader userCodeClassLoader,
+			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+
+		this.configuration = checkNotNull(configuration);
+		this.client = checkNotNull(client);
+		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+		this.jobExecutionResult = checkNotNull(jobExecutionResult);
 		this.alreadyCalled = false;
-		this.jobExecutionResult = jobExecutionResult;
 	}
 
 	@Override
 	public ExecutionEnvironment createExecutionEnvironment() {
 		verifyCreateIsCalledOnceWhenInDetachedMode();
-
-		final ContextEnvironment environment = new ContextEnvironment(
+		return new ContextEnvironment(
+			configuration,
 			client,
-			jarFilesToAttach,
-			classpathsToAttach,
 			userCodeClassLoader,
-			savepointSettings,
-			isDetached,
 			jobExecutionResult);
-		if (defaultParallelism > 0) {
-			environment.setParallelism(defaultParallelism);
-		}
-		return environment;
 	}
 
 	private void verifyCreateIsCalledOnceWhenInDetachedMode() {
-		if (isDetached && alreadyCalled) {
+		if (!configuration.getBoolean(DeploymentOptions.ATTACHED) && alreadyCalled) {
 			throw new InvalidProgramException("Multiple environments cannot be created in detached mode");
 		}
 		alreadyCalled = true;


[flink] 03/19: [hotfix] minor checkstyle fixes

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c058f2fcaa1ec71da6aa92ae730db27641babfb0
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 14 16:03:42 2019 +0100

    [hotfix] minor checkstyle fixes
---
 .../apache/flink/core/execution/DefaultExecutorServiceLoader.java   | 6 +++---
 .../src/main/java/org/apache/flink/core/execution/Executor.java     | 1 +
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 241feab..64c0034 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -67,12 +67,12 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 		}
 
 		if (compatibleFactories.size() > 1) {
-			final List<String> configStr =
+			final String configStr =
 					configuration.toMap().entrySet().stream()
 							.map(e -> e.getKey() + "=" + e.getValue())
-							.collect(Collectors.toList());
+							.collect(Collectors.joining("\n"));
 
-			throw new IllegalStateException("Multiple compatible client factories found for:\n" + String.join("\n", configStr) + ".");
+			throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
 		}
 
 		return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 3476742..8515f43 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -29,6 +29,7 @@ public interface Executor {
 
 	/**
 	 * Executes a {@link Pipeline} based on the provided configuration.
+	 *
 	 * @param pipeline the {@link Pipeline} to execute
 	 * @param configuration the {@link Configuration} with the required execution parameters
 	 * @return the {@link JobExecutionResult} corresponding to the pipeline execution.


[flink] 06/19: [FLINK-XXXXX] Add methods to ClientUtils that do not require userClassloader

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0109616c3b1b1fdd77fdb5d18785d995b73c2a37
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Nov 15 15:36:55 2019 +0100

    [FLINK-XXXXX] Add methods to ClientUtils that do not require userClassloader
---
 .../java/org/apache/flink/client/ClientUtils.java  | 37 ++++++++++------------
 1 file changed, 17 insertions(+), 20 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 2c80236..ac247ac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.client.program.ClusterClient;
@@ -44,6 +45,7 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.jar.JarFile;
@@ -99,15 +101,20 @@ public enum ClientUtils {
 		return FlinkUserCodeClassLoaders.create(resolveOrder, urls, parent, alwaysParentFirstLoaderPatterns);
 	}
 
-	public static JobExecutionResult submitJob(
-			ClusterClient<?> client,
-			JobGraph jobGraph) throws ProgramInvocationException {
-		checkNotNull(client);
-		checkNotNull(jobGraph);
+	public static CompletableFuture<JobID> submitJobAndGetJobID(ClusterClient<?> client, JobGraph jobGraph) {
+		return checkNotNull(client)
+				.submitJob(checkNotNull(jobGraph))
+				.thenApply(JobSubmissionResult::getJobID);
+	}
+
+	public static CompletableFuture<JobResult> submitJobAndGetResult(ClusterClient<?> client, JobGraph jobGraph) {
+		return submitJobAndGetJobID(client, jobGraph)
+				.thenCompose(client::requestJobResult);
+	}
+
+	public static JobExecutionResult submitJob(ClusterClient<?> client, JobGraph jobGraph) throws ProgramInvocationException {
 		try {
-			return client
-				.submitJob(jobGraph)
-				.thenApply(JobSubmissionResult::getJobID)
+			return submitJobAndGetJobID(client, jobGraph)
 				.thenApply(DetachedJobExecutionResult::new)
 				.get();
 		} catch (InterruptedException | ExecutionException e) {
@@ -120,18 +127,11 @@ public enum ClientUtils {
 			ClusterClient<?> client,
 			JobGraph jobGraph,
 			ClassLoader classLoader) throws ProgramInvocationException {
-		checkNotNull(client);
-		checkNotNull(jobGraph);
 		checkNotNull(classLoader);
 
 		JobResult jobResult;
-
 		try {
-			jobResult = client
-				.submitJob(jobGraph)
-				.thenApply(JobSubmissionResult::getJobID)
-				.thenCompose(client::requestJobResult)
-				.get();
+			jobResult = submitJobAndGetResult(client, jobGraph).get();
 		} catch (InterruptedException | ExecutionException e) {
 			ExceptionUtils.checkInterrupted(e);
 			throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
@@ -151,12 +151,9 @@ public enum ClientUtils {
 
 		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
 
-		final List<URL> jobJars = executionConfigAccessor.getJars();
-		final List<URL> classpaths = executionConfigAccessor.getClasspaths();
-
 		final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
-
 		final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+
 		try {
 			Thread.currentThread().setContextClassLoader(userCodeClassLoader);
 


[flink] 17/19: [hotfix] Annotate ClusterClientFactories as @Internal

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3330a3996ed66256c1752906d70418ff8a924adc
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 10:09:53 2019 +0100

    [hotfix] Annotate ClusterClientFactories as @Internal
---
 .../java/org/apache/flink/client/deployment/ClusterClientFactory.java   | 2 ++
 .../org/apache/flink/client/deployment/StandaloneClientFactory.java     | 2 ++
 .../src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java   | 2 ++
 3 files changed, 6 insertions(+)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
index 36647b6..3c0c5cc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 
 import javax.annotation.Nullable;
@@ -25,6 +26,7 @@ import javax.annotation.Nullable;
 /**
  * A factory containing all the necessary information for creating clients to Flink clusters.
  */
+@Internal
 public interface ClusterClientFactory<ClusterID> {
 
 	/**
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
index 9597ff8..9144025 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
@@ -29,6 +30,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A {@link ClusterClientFactory} for a standalone cluster, i.e. Flink on bare-metal.
  */
+@Internal
 public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
 
 	public static final String ID = "default";
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index ad391d8..9bd1145 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
@@ -40,6 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A {@link ClusterClientFactory} for a YARN cluster.
  */
+@Internal
 public class YarnClusterClientFactory implements ClusterClientFactory<ApplicationId> {
 
 	public static final String ID = "yarn-cluster";


[flink] 15/19: [FLINK-XXXXX] Fix job client lifecycle issue

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e74ecd236a513330307cb46d4dccd2763b25255
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 16:51:17 2019 +0100

    [FLINK-XXXXX] Fix job client lifecycle issue
---
 .../client/deployment/executors/JobClientImpl.java |  5 +++++
 .../StandaloneSessionClusterExecutor.java          |  5 ++---
 .../org/apache/flink/core/execution/JobClient.java |  2 +-
 .../flink/api/java/ExecutionEnvironment.java       | 11 +++++-----
 .../flink/api/java/ExecutorDiscoveryTest.java      |  5 +++++
 .../environment/StreamExecutionEnvironment.java    |  9 ++++----
 .../environment/ExecutorDiscoveryTest.java         |  5 +++++
 .../yarn/executors/YarnJobClusterExecutor.java     |  7 +++---
 .../yarn/executors/YarnSessionClusterExecutor.java | 25 ++++++----------------
 9 files changed, 39 insertions(+), 35 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
index e042369..c6f48bfe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
@@ -79,4 +79,9 @@ public class JobClientImpl<ClusterID> implements JobClient {
 		}));
 		return res;
 	}
+
+	@Override
+	public void close() throws Exception {
+		this.clusterClient.close();
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
index e5cc82e..df4d2ba 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -66,9 +66,8 @@ public class StandaloneSessionClusterExecutor implements Executor {
 			final StandaloneClusterId clusterID = clusterClientFactory.getClusterId(configuration);
 			checkState(clusterID != null);
 
-			try (final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
-				return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-			}
+			final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID);
+			return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
 		}
 	}
 
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 8440dd1..b4ab9a9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -29,7 +29,7 @@ import java.util.concurrent.CompletableFuture;
  * A client that is scoped to a specific job.
  */
 @PublicEvolving
-public interface JobClient {
+public interface JobClient extends AutoCloseable {
 
 	CompletableFuture<JobExecutionResult> getJobSubmissionResult();
 
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 69abe17..c600eb9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -805,12 +805,13 @@ public class ExecutionEnvironment {
 
 		final Executor executor = executorFactory.getExecutor(configuration);
 
-		final JobClient jobClient = executor.execute(plan, configuration).get();
-		lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
-				? jobClient.getJobExecutionResult(userClassloader).get()
-				: jobClient.getJobSubmissionResult().get();
+		try (final JobClient jobClient = executor.execute(plan, configuration).get()) {
+			lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
+					? jobClient.getJobExecutionResult(userClassloader).get()
+					: jobClient.getJobSubmissionResult().get();
 
-		return lastJobExecutionResult;
+			return lastJobExecutionResult;
+		}
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
index 9acbf3d..f5c34a5 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
@@ -95,6 +95,11 @@ public class ExecutorDiscoveryTest {
 					public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
 						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res));
 					}
+
+					@Override
+					public void close() {
+
+					}
 				});
 			};
 		}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7a59d5a..fdaaae0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1562,10 +1562,11 @@ public class StreamExecutionEnvironment {
 				executorServiceLoader.getExecutorFactory(configuration);
 
 		final Executor executor = executorFactory.getExecutor(configuration);
-		final JobClient jobClient = executor.execute(streamGraph, configuration).get();
-		return configuration.getBoolean(DeploymentOptions.ATTACHED)
-				? jobClient.getJobExecutionResult(userClassloader).get()
-				: jobClient.getJobSubmissionResult().get();
+		try (final JobClient jobClient = executor.execute(streamGraph, configuration).get()) {
+			return configuration.getBoolean(DeploymentOptions.ATTACHED)
+					? jobClient.getJobExecutionResult(userClassloader).get()
+					: jobClient.getJobSubmissionResult().get();
+		}
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
index ce593c2..97e4517 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
@@ -95,6 +95,11 @@ public class ExecutorDiscoveryTest {
 					public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
 						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res));
 					}
+
+					@Override
+					public void close() {
+
+					}
 				});
 			};
 		}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
index 09094e7..ead6e9a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -73,10 +73,9 @@ public class YarnJobClusterExecutor implements Executor {
 
 			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig);
 
-			try (final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
-				LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
-				return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
-			}
+			final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
+			LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
+			return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
 		}
 	}
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index dd15d1b..de4d148 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -33,8 +33,6 @@ import org.apache.flink.yarn.YarnClusterDescriptor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -56,40 +54,31 @@ public class YarnSessionClusterExecutor implements Executor {
 
 	@Override
 	public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception {
-		final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-
-		final List<URL> dependencies = configAccessor.getJars();
-		final List<URL> classpaths = configAccessor.getClasspaths();
-
-		final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
+		final JobGraph jobGraph = getJobGraph(pipeline, configuration);
 
 		try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
 			final ApplicationId clusterID = clusterClientFactory.getClusterId(configuration);
 			checkState(clusterID != null);
 
-			try (final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
-				return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-			}
+			// TODO: 17.11.19 we cannot close the client here because we simply have a future of the client
+			final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID);
+			return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
 		}
 	}
 
 	private JobGraph getJobGraph(
 			final Pipeline pipeline,
-			final Configuration configuration,
-			final List<URL> classpaths,
-			final List<URL> libraries) {
+			final Configuration configuration) {
 
 		checkNotNull(pipeline);
 		checkNotNull(configuration);
-		checkNotNull(classpaths);
-		checkNotNull(libraries);
 
 		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
 		final JobGraph jobGraph = FlinkPipelineTranslationUtil
 				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
 
-		jobGraph.addJars(libraries);
-		jobGraph.setClasspaths(classpaths);
+		jobGraph.addJars(executionConfigAccessor.getJars());
+		jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
 		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
 
 		return jobGraph;


[flink] 01/19: [hotfix] Fix parallelism consolidation logic in the (Stream)ExecutionEnvironment.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3ca53c6dd68c54bc34dc397ea7e764d14e20f8aa
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 14 15:41:57 2019 +0100

    [hotfix] Fix parallelism consolidation logic in the (Stream)ExecutionEnvironment.
---
 .../java/org/apache/flink/api/java/ExecutionEnvironment.java   | 10 ++--------
 .../streaming/api/environment/StreamExecutionEnvironment.java  | 10 ++--------
 2 files changed, 4 insertions(+), 16 deletions(-)

diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 1c6fca1..5b07843 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -801,15 +801,9 @@ public class ExecutionEnvironment {
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
-		final int execParallelism = getParallelism();
-		if (execParallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
-			return;
+		if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
+			configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism);
 		}
-
-		// if parallelism is set in the ExecutorConfig, then
-		// that value takes precedence over any other value.
-
-		configuration.set(CoreOptions.DEFAULT_PARALLELISM, execParallelism);
 	}
 
 	/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 721869c..3870b52 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1556,15 +1556,9 @@ public class StreamExecutionEnvironment {
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
-		final int execParallelism = getParallelism();
-		if (execParallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
-			return;
+		if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
+			configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism);
 		}
-
-		// if parallelism is set in the ExecutorConfig, then
-		// that value takes precedence over any other value.
-
-		configuration.set(CoreOptions.DEFAULT_PARALLELISM, execParallelism);
 	}
 
 	/**


[flink] 08/19: [FLINK-XXXXX] Change Executor.execute() signature + add Session and Job Cluster Executors

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 54a277b08dfe8ecb702c516392b01f41078df993
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Nov 12 12:23:26 2019 +0100

    [FLINK-XXXXX] Change Executor.execute() signature + add Session and Job Cluster Executors
    
    change the signature of the executors and their implementation
---
 .../java/org/apache/flink/client/ClientUtils.java  |  30 +++----
 .../client/deployment/executors/JobClientImpl.java |  82 +++++++++++++++++
 .../deployment/executors/JobClusterExecutor.java   | 100 +++++++++++++++++++++
 .../executors/SessionClusterExecutor.java          |  94 +++++++++++++++++++
 .../apache/flink/client/program/ClientTest.java    |   5 ++
 .../org/apache/flink/core/execution/Executor.java  |   4 +-
 .../execution/{Executor.java => JobClient.java}    |  23 +++--
 .../flink/api/java/ExecutionEnvironment.java       |  15 +++-
 .../flink/api/java/ExecutorDiscoveryTest.java      |  19 +++-
 .../environment/StreamExecutionEnvironment.java    |  17 +++-
 .../environment/ExecutorDiscoveryTest.java         |  18 +++-
 11 files changed, 371 insertions(+), 36 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index ac247ac..4a95a16 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -19,22 +19,20 @@
 package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.executors.JobClientImpl;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.ContextEnvironmentFactory;
-import org.apache.flink.client.program.DetachedJobExecutionResult;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -101,22 +99,18 @@ public enum ClientUtils {
 		return FlinkUserCodeClassLoaders.create(resolveOrder, urls, parent, alwaysParentFirstLoaderPatterns);
 	}
 
-	public static CompletableFuture<JobID> submitJobAndGetJobID(ClusterClient<?> client, JobGraph jobGraph) {
+	public static CompletableFuture<JobClient> submitJobAndGetJobClient(ClusterClient<?> client, JobGraph jobGraph) {
 		return checkNotNull(client)
 				.submitJob(checkNotNull(jobGraph))
-				.thenApply(JobSubmissionResult::getJobID);
-	}
-
-	public static CompletableFuture<JobResult> submitJobAndGetResult(ClusterClient<?> client, JobGraph jobGraph) {
-		return submitJobAndGetJobID(client, jobGraph)
-				.thenCompose(client::requestJobResult);
+				.thenApply(JobSubmissionResult::getJobID)
+				.thenApply(jobID -> new JobClientImpl<>(client, jobID));
 	}
 
 	public static JobExecutionResult submitJob(ClusterClient<?> client, JobGraph jobGraph) throws ProgramInvocationException {
 		try {
-			return submitJobAndGetJobID(client, jobGraph)
-				.thenApply(DetachedJobExecutionResult::new)
-				.get();
+			return submitJobAndGetJobClient(client, jobGraph)
+					.thenCompose(JobClient::getJobSubmissionResult)
+					.get();
 		} catch (InterruptedException | ExecutionException e) {
 			ExceptionUtils.checkInterrupted(e);
 			throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e);
@@ -129,17 +123,17 @@ public enum ClientUtils {
 			ClassLoader classLoader) throws ProgramInvocationException {
 		checkNotNull(classLoader);
 
-		JobResult jobResult;
+		JobClient jobClient;
 		try {
-			jobResult = submitJobAndGetResult(client, jobGraph).get();
+			jobClient = submitJobAndGetJobClient(client, jobGraph).get();
 		} catch (InterruptedException | ExecutionException e) {
 			ExceptionUtils.checkInterrupted(e);
 			throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
 		}
 
 		try {
-			return jobResult.toJobExecutionResult(classLoader);
-		} catch (JobExecutionException | IOException | ClassNotFoundException e) {
+			return jobClient.getJobExecutionResult(classLoader).get();
+		} catch (Exception e) {
 			throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
 		}
 	}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
new file mode 100644
index 0000000..e042369
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.DetachedJobExecutionResult;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Javadoc.
+ */
+public class JobClientImpl<ClusterID> implements JobClient {
+
+	private final ClusterClient<ClusterID> clusterClient;
+
+	private final JobID jobID;
+
+	public JobClientImpl(
+			final ClusterClient<ClusterID> clusterClient,
+			final JobID jobID) {
+		this.jobID = checkNotNull(jobID);
+		this.clusterClient = checkNotNull(clusterClient);
+	}
+
+	public JobID getJobID() {
+		return jobID;
+	}
+
+	@Override
+	public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+		return CompletableFuture.completedFuture(new DetachedJobExecutionResult(jobID));
+	}
+
+	@Override
+	public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader) {
+		final CompletableFuture<JobExecutionResult> res = new CompletableFuture<>();
+
+		final CompletableFuture<JobResult> jobResultFuture = clusterClient.requestJobResult(jobID);
+		jobResultFuture.whenComplete(((jobResult, throwable) -> {
+			if (throwable != null) {
+				ExceptionUtils.checkInterrupted(throwable);
+				res.completeExceptionally(new ProgramInvocationException("Could not run job", jobID, throwable));
+			} else {
+				try {
+					res.complete(jobResult.toJobExecutionResult(userClassloader));
+				} catch (JobExecutionException | IOException | ClassNotFoundException e) {
+					res.completeExceptionally(new ProgramInvocationException("Job failed", jobID, e));
+				}
+			}
+		}));
+		return res;
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
new file mode 100644
index 0000000..902d7fd
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link Executor} to be used when executing a job in isolation.
+ * This executor will start a cluster specifically for the job at hand and
+ * tear it down when the job is finished either successfully or due to an error.
+ */
+public class JobClusterExecutor<ClusterID> implements Executor {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobClusterExecutor.class);
+
+	private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+	public JobClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
+		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+	}
+
+	@Override
+	public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration executionConfig) throws Exception {
+		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executionConfig);
+
+		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) {
+			final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig);
+
+			final List<URL> dependencies = configAccessor.getJars();
+			final List<URL> classpaths = configAccessor.getClasspaths();
+
+			final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, dependencies);
+
+			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig);
+
+			try (final ClusterClient<ClusterID> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
+				LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
+				return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
+			}
+		}
+	}
+
+	private JobGraph getJobGraph(
+			final Pipeline pipeline,
+			final Configuration configuration,
+			final List<URL> classpaths,
+			final List<URL> libraries) {
+
+		checkNotNull(pipeline);
+		checkNotNull(configuration);
+		checkNotNull(classpaths);
+		checkNotNull(libraries);
+
+		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+		final JobGraph jobGraph = FlinkPipelineTranslationUtil
+				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
+
+		jobGraph.addJars(libraries);
+		jobGraph.setClasspaths(classpaths);
+		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+		return jobGraph;
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
new file mode 100644
index 0000000..b7eeb68
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link Executor} to be used when executing a job on an already running cluster.
+ */
+public class SessionClusterExecutor<ClusterID> implements Executor {
+
+	private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+	public SessionClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
+		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+	}
+
+	@Override
+	public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception {
+		final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+
+		final List<URL> dependencies = configAccessor.getJars();
+		final List<URL> classpaths = configAccessor.getClasspaths();
+
+		final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
+
+		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
+
+		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+			checkState(clusterID != null);
+
+			try (final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID)) {
+				return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
+			}
+		}
+	}
+
+	private JobGraph getJobGraph(
+			final Pipeline pipeline,
+			final Configuration configuration,
+			final List<URL> classpaths,
+			final List<URL> libraries) {
+
+		checkNotNull(pipeline);
+		checkNotNull(configuration);
+		checkNotNull(classpaths);
+		checkNotNull(libraries);
+
+		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+		final JobGraph jobGraph = FlinkPipelineTranslationUtil
+				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
+
+		jobGraph.addJars(libraries);
+		jobGraph.setClasspaths(classpaths);
+		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+		return jobGraph;
+	}
+}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 5845080..6e53abb 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -64,6 +64,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
@@ -193,6 +194,10 @@ public class ClientTest extends TestLogger {
 	@Test
 	public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException {
 		PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
+
+		when(packagedProgramMock.getUserCodeClassLoader())
+				.thenReturn(packagedProgramMock.getClass().getClassLoader());
+
 		doAnswer(new Answer<Void>() {
 			@Override
 			public Void answer(InvocationOnMock invocation) throws Throwable {
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 8515f43..5be3193 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
  */
@@ -34,5 +36,5 @@ public interface Executor {
 	 * @param configuration the {@link Configuration} with the required execution parameters
 	 * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
 	 */
-	JobExecutionResult execute(Pipeline pipeline, Configuration configuration) throws Exception;
+	CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception;
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
similarity index 60%
copy from flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
copy to flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 8515f43..8440dd1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -18,21 +18,20 @@
 
 package org.apache.flink.core.execution;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
 
 /**
- * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
+ * A client that is scoped to a specific job.
  */
-public interface Executor {
+@PublicEvolving
+public interface JobClient {
+
+	CompletableFuture<JobExecutionResult> getJobSubmissionResult();
 
-	/**
-	 * Executes a {@link Pipeline} based on the provided configuration.
-	 *
-	 * @param pipeline the {@link Pipeline} to execute
-	 * @param configuration the {@link Configuration} with the required execution parameters
-	 * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
-	 */
-	JobExecutionResult execute(Pipeline pipeline, Configuration configuration) throws Exception;
+	CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader);
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 5b07843..69abe17 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -57,6 +57,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
@@ -132,6 +133,8 @@ public class ExecutionEnvironment {
 
 	private final Configuration configuration;
 
+	private ClassLoader userClassloader;
+
 	/**
 	 * Creates a new Execution Environment.
 	 */
@@ -146,6 +149,11 @@ public class ExecutionEnvironment {
 	protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
 		this.executorServiceLoader = checkNotNull(executorServiceLoader);
 		this.configuration = checkNotNull(executorConfiguration);
+		this.userClassloader = getClass().getClassLoader();
+	}
+
+	protected void setUserClassloader(final ClassLoader userClassloader) {
+		this.userClassloader = checkNotNull(userClassloader);
 	}
 
 	protected Configuration getConfiguration() {
@@ -796,7 +804,12 @@ public class ExecutionEnvironment {
 				executorServiceLoader.getExecutorFactory(configuration);
 
 		final Executor executor = executorFactory.getExecutor(configuration);
-		lastJobExecutionResult = executor.execute(plan, configuration);
+
+		final JobClient jobClient = executor.execute(plan, configuration).get();
+		lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
+				? jobClient.getJobExecutionResult(userClassloader).get()
+				: jobClient.getJobSubmissionResult().get();
+
 		return lastJobExecutionResult;
 	}
 
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
index 49013b8..9acbf3d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
@@ -25,13 +25,17 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.util.OptionalFailure;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -46,6 +50,7 @@ public class ExecutorDiscoveryTest {
 	public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception {
 		final Configuration configuration = new Configuration();
 		configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID);
+		configuration.set(DeploymentOptions.ATTACHED, true);
 
 		final JobExecutionResult result = executeTestJobBasedOnConfig(configuration);
 
@@ -78,7 +83,19 @@ public class ExecutorDiscoveryTest {
 			return (pipeline, executionConfig) -> {
 				final Map<String, OptionalFailure<Object>> res = new HashMap<>();
 				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
-				return new JobExecutionResult(new JobID(), 12L, res);
+
+				return CompletableFuture.completedFuture(new JobClient(){
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+						throw new UnsupportedOperationException();
+					}
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
+						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res));
+					}
+				});
 			};
 		}
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3870b52..7a59d5a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -53,6 +53,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -154,6 +155,8 @@ public class StreamExecutionEnvironment {
 
 	private final Configuration configuration;
 
+	private ClassLoader userClassloader;
+
 	// --------------------------------------------------------------------------------------------
 	// Constructor and Properties
 	// --------------------------------------------------------------------------------------------
@@ -166,9 +169,16 @@ public class StreamExecutionEnvironment {
 		this(new DefaultExecutorServiceLoader(), executorConfiguration);
 	}
 
-	public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
+	public StreamExecutionEnvironment(
+			final ExecutorServiceLoader executorServiceLoader,
+			final Configuration executorConfiguration) {
 		this.executorServiceLoader = checkNotNull(executorServiceLoader);
 		this.configuration = checkNotNull(executorConfiguration);
+		this.userClassloader = getClass().getClassLoader();
+	}
+
+	protected void setUserClassloader(final ClassLoader userClassloader) {
+		this.userClassloader = checkNotNull(userClassloader);
 	}
 
 	protected Configuration getConfiguration() {
@@ -1552,7 +1562,10 @@ public class StreamExecutionEnvironment {
 				executorServiceLoader.getExecutorFactory(configuration);
 
 		final Executor executor = executorFactory.getExecutor(configuration);
-		return executor.execute(streamGraph, configuration);
+		final JobClient jobClient = executor.execute(streamGraph, configuration).get();
+		return configuration.getBoolean(DeploymentOptions.ATTACHED)
+				? jobClient.getJobExecutionResult(userClassloader).get()
+				: jobClient.getJobSubmissionResult().get();
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
index 9c11fdf..ce593c2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
@@ -24,15 +24,19 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.util.OptionalFailure;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -47,6 +51,7 @@ public class ExecutorDiscoveryTest {
 	public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception {
 		final Configuration configuration = new Configuration();
 		configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID);
+		configuration.set(DeploymentOptions.ATTACHED, true);
 
 		final JobExecutionResult result = executeTestJobBasedOnConfig(configuration);
 
@@ -79,7 +84,18 @@ public class ExecutorDiscoveryTest {
 			return (pipeline, executionConfig) -> {
 				final Map<String, OptionalFailure<Object>> res = new HashMap<>();
 				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
-				return new JobExecutionResult(new JobID(), 12L, res);
+				return CompletableFuture.completedFuture(new JobClient(){
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+						throw new UnsupportedOperationException();
+					}
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
+						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res));
+					}
+				});
 			};
 		}
 	}


[flink] 11/19: [FLINK-XXXXX] Add standalone/yarn executors and their factories

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f46bb96980a6441497c93f6823ad13480b3d340d
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sat Nov 16 21:32:08 2019 +0100

    [FLINK-XXXXX] Add standalone/yarn executors and their factories
---
 ....java => StandaloneSessionClusterExecutor.java} | 28 +++++++++++----------
 ...> StandaloneSessionClusterExecutorFactory.java} | 21 +++-------------
 ...org.apache.flink.core.execution.ExecutorFactory |  3 +--
 .../yarn/executors/YarnJobClusterExecutor.java     | 28 ++++++++++++---------
 .../executors/YarnJobClusterExecutorFactory.java   | 23 +++--------------
 .../yarn/executors/YarnSessionClusterExecutor.java | 29 ++++++++++++----------
 .../YarnSessionClusterExecutorFactory.java         | 23 +++--------------
 ...org.apache.flink.core.execution.ExecutorFactory |  4 +--
 8 files changed, 61 insertions(+), 98 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
similarity index 74%
copy from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
copy to flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
index b7eeb68..e5cc82e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.client.deployment.executors;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.deployment.StandaloneClientFactory;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.JobClient;
@@ -41,12 +42,15 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * The {@link Executor} to be used when executing a job on an already running cluster.
  */
-public class SessionClusterExecutor<ClusterID> implements Executor {
+@Internal
+public class StandaloneSessionClusterExecutor implements Executor {
 
-	private final ClusterClientServiceLoader clusterClientServiceLoader;
+	public static final String NAME = "standalone-session-cluster";
 
-	public SessionClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
-		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+	private final StandaloneClientFactory clusterClientFactory;
+
+	public StandaloneSessionClusterExecutor() {
+		this.clusterClientFactory = new StandaloneClientFactory();
 	}
 
 	@Override
@@ -58,13 +62,11 @@ public class SessionClusterExecutor<ClusterID> implements Executor {
 
 		final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
 
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
-
-		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
-			final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+		try (final StandaloneClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final StandaloneClusterId clusterID = clusterClientFactory.getClusterId(configuration);
 			checkState(clusterID != null);
 
-			try (final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID)) {
+			try (final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
 				return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
 			}
 		}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
similarity index 62%
copy from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
copy to flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
index 8e5a5eb..06dd451 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
@@ -19,39 +19,24 @@
 package org.apache.flink.client.deployment.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.ClusterMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
  */
 @Internal
-public class SessionClusterExecutorFactory implements ExecutorFactory {
-
-	private final ClusterClientServiceLoader clusterClientServiceLoader;
-
-	public SessionClusterExecutorFactory() {
-		this(new DefaultClusterClientServiceLoader());
-	}
-
-	public SessionClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) {
-		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
-	}
+public class StandaloneSessionClusterExecutorFactory implements ExecutorFactory {
 
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
-		return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.SESSION);
+		return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(StandaloneSessionClusterExecutor.NAME);
 	}
 
 	@Override
 	public Executor getExecutor(Configuration configuration) {
-		return new SessionClusterExecutor<>(clusterClientServiceLoader);
+		return new StandaloneSessionClusterExecutor();
 	}
 }
diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index 870c57d..d9b144f 100644
--- a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,5 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.client.deployment.executors.JobClusterExecutorFactory
-org.apache.flink.client.deployment.executors.SessionClusterExecutorFactory
\ No newline at end of file
+org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutorFactory
\ No newline at end of file
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
similarity index 75%
rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
index 902d7fd..09094e7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -16,21 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.executors.JobClientImpl;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+import org.apache.flink.yarn.YarnClusterDescriptor;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,21 +47,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * This executor will start a cluster specifically for the job at hand and
  * tear it down when the job is finished either successfully or due to an error.
  */
-public class JobClusterExecutor<ClusterID> implements Executor {
+@Internal
+public class YarnJobClusterExecutor implements Executor {
 
-	private static final Logger LOG = LoggerFactory.getLogger(JobClusterExecutor.class);
+	private static final Logger LOG = LoggerFactory.getLogger(YarnJobClusterExecutor.class);
 
-	private final ClusterClientServiceLoader clusterClientServiceLoader;
+	public static final String NAME = "yarn-job-cluster";
 
-	public JobClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
-		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+	private final YarnClusterClientFactory clusterClientFactory;
+
+	public YarnJobClusterExecutor() {
+		this.clusterClientFactory = new YarnClusterClientFactory();
 	}
 
 	@Override
 	public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration executionConfig) throws Exception {
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executionConfig);
 
-		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) {
+		try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) {
 			final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig);
 
 			final List<URL> dependencies = configAccessor.getJars();
@@ -69,7 +73,7 @@ public class JobClusterExecutor<ClusterID> implements Executor {
 
 			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig);
 
-			try (final ClusterClient<ClusterID> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
+			try (final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
 				LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
 				return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
 			}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
similarity index 60%
rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java
rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
index 5e10984..408a819 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
@@ -16,42 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.ClusterMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * An {@link ExecutorFactory} for executing jobs on dedicated (per-job) clusters.
  */
 @Internal
-public class JobClusterExecutorFactory implements ExecutorFactory {
-
-	private final ClusterClientServiceLoader clusterClientServiceLoader;
-
-	public JobClusterExecutorFactory() {
-		this(new DefaultClusterClientServiceLoader());
-	}
-
-	public JobClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) {
-		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
-	}
+public class YarnJobClusterExecutorFactory implements ExecutorFactory {
 
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
-		return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.PER_JOB);
+		return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(YarnJobClusterExecutor.NAME);
 	}
 
 	@Override
 	public Executor getExecutor(Configuration configuration) {
-		return new JobClusterExecutor<>(clusterClientServiceLoader);
+		return new YarnJobClusterExecutor();
 	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
similarity index 74%
rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index b7eeb68..dd15d1b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -16,20 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 import java.net.URL;
 import java.util.List;
@@ -41,12 +43,15 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * The {@link Executor} to be used when executing a job on an already running cluster.
  */
-public class SessionClusterExecutor<ClusterID> implements Executor {
+@Internal
+public class YarnSessionClusterExecutor implements Executor {
+
+	public static final String NAME = "yarn-session-cluster";
 
-	private final ClusterClientServiceLoader clusterClientServiceLoader;
+	private final YarnClusterClientFactory clusterClientFactory;
 
-	public SessionClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
-		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+	public YarnSessionClusterExecutor() {
+		this.clusterClientFactory = new YarnClusterClientFactory();
 	}
 
 	@Override
@@ -58,13 +63,11 @@ public class SessionClusterExecutor<ClusterID> implements Executor {
 
 		final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
 
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
-
-		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
-			final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+		try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final ApplicationId clusterID = clusterClientFactory.getClusterId(configuration);
 			checkState(clusterID != null);
 
-			try (final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID)) {
+			try (final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID)) {
 				return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
 			}
 		}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
similarity index 59%
rename from flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
rename to flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
index 8e5a5eb..5b6b847 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
@@ -16,42 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.yarn.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.configuration.ClusterMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
  */
 @Internal
-public class SessionClusterExecutorFactory implements ExecutorFactory {
-
-	private final ClusterClientServiceLoader clusterClientServiceLoader;
-
-	public SessionClusterExecutorFactory() {
-		this(new DefaultClusterClientServiceLoader());
-	}
-
-	public SessionClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) {
-		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
-	}
+public class YarnSessionClusterExecutorFactory implements ExecutorFactory {
 
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
-		return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.SESSION);
+		return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase(YarnSessionClusterExecutor.NAME);
 	}
 
 	@Override
 	public Executor getExecutor(Configuration configuration) {
-		return new SessionClusterExecutor<>(clusterClientServiceLoader);
+		return new YarnSessionClusterExecutor();
 	}
 }
diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
similarity index 84%
copy from flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
copy to flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index 870c57d..d56f8c5 100644
--- a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,5 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.client.deployment.executors.JobClusterExecutorFactory
-org.apache.flink.client.deployment.executors.SessionClusterExecutorFactory
\ No newline at end of file
+org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
+org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory
\ No newline at end of file


[flink] 02/19: [hotfix] ignore empty yarn properties

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit accfbf59b0a843697b55cce290d221344ef6c94d
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 16:50:18 2019 +0100

    [hotfix] ignore empty yarn properties
---
 .../src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 3fb8dfa..362deda 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -416,7 +416,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 		}
 
 		final String dynamicPropertiesEncoded = encodeDynamicProperties(commandLine);
-		configuration.setString(YarnConfigOptionsInternal.DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
+		if (dynamicPropertiesEncoded != null && !dynamicPropertiesEncoded.isEmpty()) {
+			configuration.setString(YarnConfigOptionsInternal.DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
+		}
 
 		final boolean detached = commandLine.hasOption(YARN_DETACHED_OPTION.getOpt()) || commandLine.hasOption(DETACHED_OPTION.getOpt());
 		configuration.setBoolean(DeploymentOptions.ATTACHED, !detached);


[flink] 04/19: [hotfix] Merge configurations in arguemnt list of runProgram() in CLIFrontend

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c3141f2873c5aec8f30d468b89ef304edc253635
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 14 15:08:25 2019 +0100

    [hotfix] Merge configurations in arguemnt list of runProgram() in CLIFrontend
---
 .../org/apache/flink/client/cli/CliFrontend.java   | 44 ++++++++++++++--------
 .../flink/client/cli/ExecutionConfigAccessor.java  |  5 ++-
 2 files changed, 31 insertions(+), 18 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 38243fc..5ed2901 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -208,33 +208,45 @@ public class CliFrontend {
 			throw new CliArgsException("Could not build the program from JAR file.", e);
 		}
 
-		final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
-		final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
-
-		final List<URL> jobJars = program.getJobJarAndDependencies();
-		final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars);
-		final Configuration executionConfig = executionParameters.getConfiguration();
+		final Configuration effectiveConfiguration = getEffectiveConfiguration(
+				commandLine,
+				programOptions,
+				program.getJobJarAndDependencies());
 
 		try {
-			runProgram(executorConfig, executionConfig, program);
+			runProgram(effectiveConfiguration, program);
 		} finally {
 			program.deleteExtractedLibraries();
 		}
 	}
 
+	private Configuration getEffectiveConfiguration(
+			final CommandLine commandLine,
+			final ProgramOptions programOptions,
+			final List<URL> jobJars) throws FlinkException {
+
+		final CustomCommandLine customCommandLine = getActiveCustomCommandLine(checkNotNull(commandLine));
+		final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(
+				checkNotNull(programOptions),
+				checkNotNull(jobJars));
+
+		final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+		final Configuration effectiveConfiguration = new Configuration(executorConfig);
+		return executionParameters.applyToConfiguration(effectiveConfiguration);
+	}
+
 	private <ClusterID> void runProgram(
-			Configuration executorConfig,
-			Configuration executionConfig,
+			Configuration configuration,
 			PackagedProgram program) throws ProgramInvocationException, FlinkException {
 
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executorConfig);
+		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
 		checkNotNull(clusterClientFactory);
 
-		final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executorConfig);
+		final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterID clusterId = clusterClientFactory.getClusterId(executorConfig);
-			final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(executionConfig);
+			final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
+			final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration);
 			final ClusterClient<ClusterID> client;
 
 			// directly deploy the job if the cluster is started in job mode and detached
@@ -243,7 +255,7 @@ public class CliFrontend {
 
 				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
 
-				final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig);
+				final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
 				client = clusterDescriptor.deployJobCluster(
 					clusterSpecification,
 					jobGraph,
@@ -264,7 +276,7 @@ public class CliFrontend {
 				} else {
 					// also in job mode we have to deploy a session cluster because the job
 					// might consist of multiple parts (e.g. when using collect)
-					final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig);
+					final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
 					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
 					// there's a race-condition here if cli is killed before shutdown hook is installed
@@ -279,7 +291,7 @@ public class CliFrontend {
 					int userParallelism = executionParameters.getParallelism();
 					LOG.debug("User parallelism is set to {}", userParallelism);
 
-					executeProgram(executionConfig, program, client);
+					executeProgram(configuration, program, client);
 				} finally {
 					if (clusterId == null && !executionParameters.getDetachedMode()) {
 						// terminate the cluster only if we have started it before and if it's not detached
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index 9e570e1..f55560b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -77,8 +77,9 @@ public class ExecutionConfigAccessor {
 		return new ExecutionConfigAccessor(configuration);
 	}
 
-	public Configuration getConfiguration() {
-		return configuration;
+	Configuration applyToConfiguration(final Configuration baseConfiguration) {
+		baseConfiguration.addAll(configuration);
+		return baseConfiguration;
 	}
 
 	public List<URL> getJars() {


[flink] 14/19: Wired verything together

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit df355d9c4a282ff93f8fbddacfc1c7df0f774ef3
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 13:49:02 2019 +0100

    Wired verything together
---
 .../java/org/apache/flink/client/ClientUtils.java  |   1 -
 .../org/apache/flink/client/cli/CliFrontend.java   | 102 ++-------------------
 .../flink/client/cli/CliFrontendRunTest.java       |   3 +-
 .../execution/DefaultExecutorServiceLoader.java    |   2 +-
 4 files changed, 10 insertions(+), 98 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 5824832..f971982 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 258708a..8dfe306 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
@@ -45,11 +44,11 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -57,7 +56,6 @@ import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -215,7 +213,7 @@ public class CliFrontend {
 				program.getJobJarAndDependencies());
 
 		try {
-			runProgram(effectiveConfiguration, program);
+			execute(effectiveConfiguration, program);
 		} finally {
 			program.deleteExtractedLibraries();
 		}
@@ -236,92 +234,6 @@ public class CliFrontend {
 		return executionParameters.applyToConfiguration(effectiveConfiguration);
 	}
 
-	private <ClusterID> void runProgram(
-			Configuration configuration,
-			PackagedProgram program) throws ProgramInvocationException, FlinkException {
-
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
-		checkNotNull(clusterClientFactory);
-
-		final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
-
-		try {
-			final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
-			final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration);
-			final ClusterClient<ClusterID> client;
-
-			// directly deploy the job if the cluster is started in job mode and detached
-			if (clusterId == null && executionParameters.getDetachedMode()) {
-				int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();
-
-				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
-
-				final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
-				client = clusterDescriptor.deployJobCluster(
-					clusterSpecification,
-					jobGraph,
-					executionParameters.getDetachedMode());
-
-				logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
-
-				try {
-					client.close();
-				} catch (Exception e) {
-					LOG.info("Could not properly shut down the client.", e);
-				}
-			} else {
-				final Thread shutdownHook;
-				if (clusterId != null) {
-					client = clusterDescriptor.retrieve(clusterId);
-					shutdownHook = null;
-				} else {
-					// also in job mode we have to deploy a session cluster because the job
-					// might consist of multiple parts (e.g. when using collect)
-					final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
-					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
-					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
-					// there's a race-condition here if cli is killed before shutdown hook is installed
-					if (!executionParameters.getDetachedMode() && executionParameters.isShutdownOnAttachedExit()) {
-						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
-					} else {
-						shutdownHook = null;
-					}
-				}
-
-				try {
-					int userParallelism = executionParameters.getParallelism();
-					LOG.debug("User parallelism is set to {}", userParallelism);
-
-					executeProgram(configuration, program);
-				} finally {
-					if (clusterId == null && !executionParameters.getDetachedMode()) {
-						// terminate the cluster only if we have started it before and if it's not detached
-						try {
-							client.shutDownCluster();
-						} catch (final Exception e) {
-							LOG.info("Could not properly terminate the Flink cluster.", e);
-						}
-						if (shutdownHook != null) {
-							// we do not need the hook anymore as we have just tried to shutdown the cluster.
-							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
-						}
-					}
-					try {
-						client.close();
-					} catch (Exception e) {
-						LOG.info("Could not properly shut down the client.", e);
-					}
-				}
-			}
-		} finally {
-			try {
-				clusterDescriptor.close();
-			} catch (Exception e) {
-				LOG.info("Could not properly close the cluster descriptor.", e);
-			}
-		}
-	}
-
 	/**
 	 * Executes the info action.
 	 *
@@ -751,12 +663,14 @@ public class CliFrontend {
 	//  Interaction with programs and JobManager
 	// --------------------------------------------------------------------------------------------
 
-	protected void executeProgram(
-			Configuration configuration,
-			PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
+	protected void execute(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException, FlinkException {
+		checkNotNull(configuration);
+		checkNotNull(program);
+
 		logAndSysout("Starting execution of program");
 
-		JobSubmissionResult result = ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program);
+		final ExecutorServiceLoader executorServiceLoader = new DefaultExecutorServiceLoader();
+		final JobSubmissionResult result = ClientUtils.executeProgram(executorServiceLoader, configuration, program);
 
 		if (result.isJobExecutionResult()) {
 			logAndSysout("Program execution finished");
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index a0d551b..50232ba 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.cli;
 
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -198,7 +197,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 		}
 
 		@Override
-		protected void executeProgram(Configuration configuration, PackagedProgram program, ClusterClient client) {
+		protected void execute(final Configuration configuration, final PackagedProgram program) {
 			final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
 			assertEquals(isDetached, executionConfigAccessor.getDetachedMode());
 			assertEquals(expectedParallelism, executionConfigAccessor.getParallelism());
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index b627b71..297b17e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -32,9 +32,9 @@ import java.util.stream.Collectors;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * todo make it singleton
  * The default implementation of the {@link ExecutorServiceLoader}. This implementation uses
  * Java service discovery to find the available {@link ExecutorFactory executor factories}.
+ * MAKE IT A SINGLETON.
  */
 public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 


[flink] 18/19: [FLINK-XXXXX] Deduplicate the Executor code and fix javadocs.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8212ba8d3721f7fc985036473b76bee32eddbdc7
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 10:25:19 2019 +0100

    [FLINK-XXXXX] Deduplicate the Executor code and fix javadocs.
---
 .../deployment/AbstractJobClusterExecutor.java     | 71 ++++++++++++++++++++++
 .../deployment/AbstractSessionClusterExecutor.java | 66 ++++++++++++++++++++
 .../flink/client/deployment/ExecutorUtils.java     | 59 ++++++++++++++++++
 .../StandaloneSessionClusterExecutor.java          | 63 +------------------
 .../org/apache/flink/core/execution/Executor.java  | 13 ++--
 .../yarn/executors/YarnJobClusterExecutor.java     | 68 +--------------------
 .../yarn/executors/YarnSessionClusterExecutor.java | 53 +---------------
 7 files changed, 214 insertions(+), 179 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
new file mode 100644
index 0000000..310dd61
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.executors.JobClientImpl;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on dedicated (per-job) clusters.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
+
+	private final ClientFactory clusterClientFactory;
+
+	public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
+		this.clusterClientFactory = checkNotNull(clusterClientFactory);
+	}
+
+	@Override
+	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+		final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
+
+		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+
+			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
+
+			final ClusterClient<ClusterID> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
+			LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
+			return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
+		}
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
new file mode 100644
index 0000000..2150b18
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on an existing (session) cluster.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor {
+
+	private final ClientFactory clusterClientFactory;
+
+	public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
+		this.clusterClientFactory = checkNotNull(clusterClientFactory);
+	}
+
+	@Override
+	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+		final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
+
+		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+			checkState(clusterID != null);
+
+			// the caller should take care of managing the life-cycle of the return JobClient.
+
+			final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID);
+			return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
+		}
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
new file mode 100644
index 0000000..af540cb
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class with method related to job execution.
+ */
+public class ExecutorUtils {
+
+	/**
+	 * Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}.
+	 *
+	 * @param pipeline the pipeline whose job graph we are computing
+	 * @param configuration the configuration with the necessary information such as jars and
+	 *                         classpaths to be included, the parallelism of the job and potential
+	 *                         savepoint settings used to boostrap its state.
+	 * @return the corresponding {@link JobGraph}.
+	 */
+	public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) {
+		checkNotNull(pipeline);
+		checkNotNull(configuration);
+
+		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+		final JobGraph jobGraph = FlinkPipelineTranslationUtil
+				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
+
+		jobGraph.addJars(executionConfigAccessor.getJars());
+		jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
+		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+		return jobGraph;
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
index df4d2ba..f097323 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -19,77 +19,20 @@
 package org.apache.flink.client.deployment.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
 import org.apache.flink.client.deployment.StandaloneClientFactory;
-import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.Executor;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-
-import java.net.URL;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The {@link Executor} to be used when executing a job on an already running cluster.
  */
 @Internal
-public class StandaloneSessionClusterExecutor implements Executor {
+public class StandaloneSessionClusterExecutor extends AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> {
 
 	public static final String NAME = "standalone-session-cluster";
 
-	private final StandaloneClientFactory clusterClientFactory;
-
 	public StandaloneSessionClusterExecutor() {
-		this.clusterClientFactory = new StandaloneClientFactory();
-	}
-
-	@Override
-	public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception {
-		final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-
-		final List<URL> dependencies = configAccessor.getJars();
-		final List<URL> classpaths = configAccessor.getClasspaths();
-
-		final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
-
-		try (final StandaloneClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
-			final StandaloneClusterId clusterID = clusterClientFactory.getClusterId(configuration);
-			checkState(clusterID != null);
-
-			final RestClusterClient<StandaloneClusterId> clusterClient = clusterDescriptor.retrieve(clusterID);
-			return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-		}
-	}
-
-	private JobGraph getJobGraph(
-			final Pipeline pipeline,
-			final Configuration configuration,
-			final List<URL> classpaths,
-			final List<URL> libraries) {
-
-		checkNotNull(pipeline);
-		checkNotNull(configuration);
-		checkNotNull(classpaths);
-		checkNotNull(libraries);
-
-		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-		final JobGraph jobGraph = FlinkPipelineTranslationUtil
-				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
-
-		jobGraph.addJars(libraries);
-		jobGraph.setClasspaths(classpaths);
-		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
-
-		return jobGraph;
+		super(new StandaloneClientFactory());
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 5be3193..b585be6 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.core.execution;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 
+import javax.annotation.Nonnull;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -30,11 +31,15 @@ import java.util.concurrent.CompletableFuture;
 public interface Executor {
 
 	/**
-	 * Executes a {@link Pipeline} based on the provided configuration.
+	 * Executes a {@link Pipeline} based on the provided configuration and returns a {@link JobClient} which allows to
+	 * interact with the job being executed, e.g. cancel it or take a savepoint.
+	 *
+	 * <p><b>ATTENTION:</b> The caller is responsible for managing the lifecycle of the returned {@link JobClient}. This
+	 * means that e.g. {@code close()} should be called explicitly at the call-site.
 	 *
 	 * @param pipeline the {@link Pipeline} to execute
 	 * @param configuration the {@link Configuration} with the required execution parameters
-	 * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
+	 * @return a {@link CompletableFuture} with the {@link JobClient} corresponding to the pipeline.
 	 */
-	CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception;
+	CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
index ead6e9a..084b020 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -19,28 +19,11 @@
 package org.apache.flink.yarn.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.deployment.executors.JobClientImpl;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.client.deployment.AbstractJobClusterExecutor;
 import org.apache.flink.core.execution.Executor;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.yarn.YarnClusterClientFactory;
-import org.apache.flink.yarn.YarnClusterDescriptor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URL;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The {@link Executor} to be used when executing a job in isolation.
@@ -48,56 +31,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * tear it down when the job is finished either successfully or due to an error.
  */
 @Internal
-public class YarnJobClusterExecutor implements Executor {
-
-	private static final Logger LOG = LoggerFactory.getLogger(YarnJobClusterExecutor.class);
+public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
 
 	public static final String NAME = "yarn-job-cluster";
 
-	private final YarnClusterClientFactory clusterClientFactory;
-
 	public YarnJobClusterExecutor() {
-		this.clusterClientFactory = new YarnClusterClientFactory();
-	}
-
-	@Override
-	public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration executionConfig) throws Exception {
-
-		try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) {
-			final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig);
-
-			final List<URL> dependencies = configAccessor.getJars();
-			final List<URL> classpaths = configAccessor.getClasspaths();
-
-			final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, dependencies);
-
-			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig);
-
-			final ClusterClient<ApplicationId> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
-			LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
-			return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
-		}
-	}
-
-	private JobGraph getJobGraph(
-			final Pipeline pipeline,
-			final Configuration configuration,
-			final List<URL> classpaths,
-			final List<URL> libraries) {
-
-		checkNotNull(pipeline);
-		checkNotNull(configuration);
-		checkNotNull(classpaths);
-		checkNotNull(libraries);
-
-		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-		final JobGraph jobGraph = FlinkPipelineTranslationUtil
-				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
-
-		jobGraph.addJars(libraries);
-		jobGraph.setClasspaths(classpaths);
-		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
-
-		return jobGraph;
+		super(new YarnClusterClientFactory());
 	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index de4d148..873dce4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -19,68 +19,21 @@
 package org.apache.flink.yarn.executors;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
 import org.apache.flink.core.execution.Executor;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.yarn.YarnClusterClientFactory;
-import org.apache.flink.yarn.YarnClusterDescriptor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * The {@link Executor} to be used when executing a job on an already running cluster.
  */
 @Internal
-public class YarnSessionClusterExecutor implements Executor {
+public class YarnSessionClusterExecutor extends AbstractSessionClusterExecutor<ApplicationId, YarnClusterClientFactory> {
 
 	public static final String NAME = "yarn-session-cluster";
 
-	private final YarnClusterClientFactory clusterClientFactory;
-
 	public YarnSessionClusterExecutor() {
-		this.clusterClientFactory = new YarnClusterClientFactory();
-	}
-
-	@Override
-	public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception {
-		final JobGraph jobGraph = getJobGraph(pipeline, configuration);
-
-		try (final YarnClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
-			final ApplicationId clusterID = clusterClientFactory.getClusterId(configuration);
-			checkState(clusterID != null);
-
-			// TODO: 17.11.19 we cannot close the client here because we simply have a future of the client
-			final ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(clusterID);
-			return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
-		}
-	}
-
-	private JobGraph getJobGraph(
-			final Pipeline pipeline,
-			final Configuration configuration) {
-
-		checkNotNull(pipeline);
-		checkNotNull(configuration);
-
-		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-		final JobGraph jobGraph = FlinkPipelineTranslationUtil
-				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
-
-		jobGraph.addJars(executionConfigAccessor.getJars());
-		jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
-		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
-
-		return jobGraph;
+		super(new YarnClusterClientFactory());
 	}
 }


[flink] 07/19: [hotfix] code style fix

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7528d5c8082b75cad49d34189c55bfa9397fe954
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Nov 15 15:38:31 2019 +0100

    [hotfix] code style fix
---
 .../src/main/java/org/apache/flink/client/RemoteExecutor.java         | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index c9a041d..51fb99f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -124,9 +124,7 @@ public class RemoteExecutor extends PlanExecutor {
 		checkNotNull(jobGraph);
 		checkNotNull(classLoader);
 
-		try (ClusterClient<?> client = new RestClusterClient<>(
-				clientConfiguration,
-				"RemoteExecutor")) {
+		try (ClusterClient<?> client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor")) {
 			return ClientUtils.submitJobAndWaitForResult(client, jobGraph, classLoader).getJobExecutionResult();
 		}
 	}


[flink] 19/19: [FLINK-XXXXX] Make DefaultExecutorServiceLoader a singleton.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d29867f427f132f46ebedee61702c2969048f7de
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 11:27:35 2019 +0100

    [FLINK-XXXXX] Make DefaultExecutorServiceLoader a singleton.
---
 .../src/main/java/org/apache/flink/client/cli/CliFrontend.java   | 2 +-
 .../flink/core/execution/DefaultExecutorServiceLoader.java       | 9 ++++++++-
 .../java/org/apache/flink/api/java/ExecutionEnvironment.java     | 2 +-
 .../streaming/api/environment/StreamExecutionEnvironment.java    | 2 +-
 4 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 8dfe306..408d478 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -669,7 +669,7 @@ public class CliFrontend {
 
 		logAndSysout("Starting execution of program");
 
-		final ExecutorServiceLoader executorServiceLoader = new DefaultExecutorServiceLoader();
+		final ExecutorServiceLoader executorServiceLoader = DefaultExecutorServiceLoader.INSTANCE;
 		final JobSubmissionResult result = ClientUtils.executeProgram(executorServiceLoader, configuration, program);
 
 		if (result.isJobExecutionResult()) {
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 297b17e..7122167 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.core.execution;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 
 import org.slf4j.Logger;
@@ -34,8 +35,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * The default implementation of the {@link ExecutorServiceLoader}. This implementation uses
  * Java service discovery to find the available {@link ExecutorFactory executor factories}.
- * MAKE IT A SINGLETON.
  */
+@Internal
 public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 
 	// TODO: This code is almost identical to the ClusterClientServiceLoader and its default implementation.
@@ -46,6 +47,12 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 
 	private static final ServiceLoader<ExecutorFactory> defaultLoader = ServiceLoader.load(ExecutorFactory.class);
 
+	public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader();
+
+	private DefaultExecutorServiceLoader() {
+		// make sure nobody instantiates us explicitly.
+	}
+
 	@Override
 	public ExecutorFactory getExecutorFactory(final Configuration configuration) {
 		checkNotNull(configuration);
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index c600eb9..df2cb0c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -143,7 +143,7 @@ public class ExecutionEnvironment {
 	}
 
 	protected ExecutionEnvironment(final Configuration executorConfiguration) {
-		this(new DefaultExecutorServiceLoader(), executorConfiguration);
+		this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration);
 	}
 
 	protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index fdaaae0..c51064f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -166,7 +166,7 @@ public class StreamExecutionEnvironment {
 	}
 
 	public StreamExecutionEnvironment(final Configuration executorConfiguration) {
-		this(new DefaultExecutorServiceLoader(), executorConfiguration);
+		this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration);
 	}
 
 	public StreamExecutionEnvironment(


[flink] 13/19: Update ContextEnvironments + Deactivated test!!! TO RE-ACTIVATE

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b771f256eecd2cc4362c267f5f627b3cacf0bdd9
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 13:16:42 2019 +0100

    Update ContextEnvironments + Deactivated test!!! TO RE-ACTIVATE
---
 .../java/org/apache/flink/client/ClientUtils.java  |   8 +-
 .../org/apache/flink/client/cli/CliFrontend.java   |   8 +-
 .../flink/client/program/ContextEnvironment.java   |  98 +++--------
 .../client/program/ContextEnvironmentFactory.java  |  18 +--
 .../apache/flink/client/program/ClientTest.java    | 179 ++++++++++-----------
 .../execution/DefaultExecutorServiceLoader.java    |   1 +
 .../api/environment/StreamContextEnvironment.java  |  33 +---
 7 files changed, 139 insertions(+), 206 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 4a95a16..5824832 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -30,6 +30,8 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -139,10 +141,10 @@ public enum ClientUtils {
 	}
 
 	public static JobSubmissionResult executeProgram(
+			ExecutorServiceLoader executorServiceLoader,
 			Configuration configuration,
-			ClusterClient<?> client,
 			PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
-
+		checkNotNull(executorServiceLoader);
 		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
 
 		final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
@@ -156,8 +158,8 @@ public enum ClientUtils {
 			final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>();
 
 			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
+					executorServiceLoader,
 					configuration,
-					client,
 					userCodeClassLoader,
 					jobExecutionResult);
 			ContextEnvironment.setAsContext(factory);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 5ed2901..258708a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -44,6 +44,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -291,7 +292,7 @@ public class CliFrontend {
 					int userParallelism = executionParameters.getParallelism();
 					LOG.debug("User parallelism is set to {}", userParallelism);
 
-					executeProgram(configuration, program, client);
+					executeProgram(configuration, program);
 				} finally {
 					if (clusterId == null && !executionParameters.getDetachedMode()) {
 						// terminate the cluster only if we have started it before and if it's not detached
@@ -752,11 +753,10 @@ public class CliFrontend {
 
 	protected void executeProgram(
 			Configuration configuration,
-			PackagedProgram program,
-			ClusterClient<?> client) throws ProgramMissingJobException, ProgramInvocationException {
+			PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
 		logAndSysout("Starting execution of program");
 
-		JobSubmissionResult result = ClientUtils.executeProgram(configuration, client, program);
+		JobSubmissionResult result = ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program);
 
 		if (result.isJobExecutionResult()) {
 			logAndSysout("Program execution finished");
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 9d3927a..2cc1d69 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -21,17 +21,12 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,75 +36,54 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ContextEnvironment extends ExecutionEnvironment {
 
-	private final ClusterClient<?> client;
+	private final ExecutorServiceLoader executorServiceLoader;
 
-	private final boolean detached;
+	private final Configuration configuration;
 
-	private final List<URL> jarFilesToAttach;
-
-	private final List<URL> classpathsToAttach;
-
-	private final ClassLoader userCodeClassLoader;
-
-	private final SavepointRestoreSettings savepointSettings;
+	private final ClassLoader userClassloader;
 
 	private final AtomicReference<JobExecutionResult> jobExecutionResult;
 
 	private boolean alreadyCalled;
 
-	public ContextEnvironment(
+	ContextEnvironment(
+			final ExecutorServiceLoader executorServiceLoader,
 			final Configuration configuration,
-			final ClusterClient<?> remoteConnection,
 			final ClassLoader userCodeClassLoader,
 			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+		super(executorServiceLoader, configuration);
 
-		final ExecutionConfigAccessor accessor = ExecutionConfigAccessor
-				.fromConfiguration(checkNotNull(configuration));
-
-		this.jarFilesToAttach = accessor.getJars();
-		this.classpathsToAttach = accessor.getClasspaths();
-		this.savepointSettings = accessor.getSavepointRestoreSettings();
-		this.detached = accessor.getDetachedMode();
+		this.executorServiceLoader = checkNotNull(executorServiceLoader);
+		this.configuration = checkNotNull(configuration);
+		this.userClassloader = checkNotNull(userCodeClassLoader);
+		this.jobExecutionResult = checkNotNull(jobExecutionResult);
+		this.alreadyCalled = false;
 
-		final int parallelism = accessor.getParallelism();
+		final int parallelism = configuration.get(CoreOptions.DEFAULT_PARALLELISM);
 		if (parallelism > 0) {
 			setParallelism(parallelism);
 		}
+		super.setUserClassloader(userCodeClassLoader);
+	}
 
-		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
-		this.jobExecutionResult = checkNotNull(jobExecutionResult);
-		this.client = checkNotNull(remoteConnection);
+	public ExecutorServiceLoader getExecutorServiceLoader() {
+		return executorServiceLoader;
+	}
 
-		this.alreadyCalled = false;
+	public Configuration getConfiguration() {
+		return configuration;
 	}
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		verifyExecuteIsCalledOnceWhenInDetachedMode();
-
-		Plan plan = createProgramPlan(jobName);
-
-		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
-				plan,
-				client.getFlinkConfiguration(),
-				getParallelism());
-
-		jobGraph.addJars(this.jarFilesToAttach);
-		jobGraph.setClasspaths(this.classpathsToAttach);
-
-		if (detached) {
-			lastJobExecutionResult = ClientUtils.submitJob(client, jobGraph);
-		} else {
-			lastJobExecutionResult = ClientUtils.submitJobAndWaitForResult(client, jobGraph, userCodeClassLoader).getJobExecutionResult();
-		}
-
+		lastJobExecutionResult = super.execute(jobName);
 		setJobExecutionResult(lastJobExecutionResult);
-
 		return lastJobExecutionResult;
 	}
 
 	private void verifyExecuteIsCalledOnceWhenInDetachedMode() {
-		if (alreadyCalled && detached) {
+		if (alreadyCalled && !configuration.getBoolean(DeploymentOptions.ATTACHED)) {
 			throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE);
 		}
 		alreadyCalled = true;
@@ -124,28 +98,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ")";
 	}
 
-	public ClusterClient<?> getClient() {
-		return this.client;
-	}
-
-	public List<URL> getJars(){
-		return jarFilesToAttach;
-	}
-
-	public List<URL> getClasspaths(){
-		return classpathsToAttach;
-	}
-
-	public ClassLoader getUserCodeClassLoader() {
-		return userCodeClassLoader;
-	}
-
-	public SavepointRestoreSettings getSavepointRestoreSettings() {
-		return savepointSettings;
-	}
-
-	public boolean isDetached() {
-		return detached;
+	public ClassLoader getUserClassloader() {
+		return userClassloader;
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index f1c9ad6..ec68a13 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -36,9 +37,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-	private final Configuration configuration;
+	private final ExecutorServiceLoader executorServiceLoader;
 
-	private final ClusterClient<?> client;
+	private final Configuration configuration;
 
 	private final ClassLoader userCodeClassLoader;
 
@@ -47,13 +48,12 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 	private boolean alreadyCalled;
 
 	public ContextEnvironmentFactory(
+			final ExecutorServiceLoader executorServiceLoader,
 			final Configuration configuration,
-			final ClusterClient<?> client,
 			final ClassLoader userCodeClassLoader,
 			final AtomicReference<JobExecutionResult> jobExecutionResult) {
-
+		this.executorServiceLoader = checkNotNull(executorServiceLoader);
 		this.configuration = checkNotNull(configuration);
-		this.client = checkNotNull(client);
 		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
 		this.jobExecutionResult = checkNotNull(jobExecutionResult);
 		this.alreadyCalled = false;
@@ -63,10 +63,10 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 	public ExecutionEnvironment createExecutionEnvironment() {
 		verifyCreateIsCalledOnceWhenInDetachedMode();
 		return new ContextEnvironment(
-			configuration,
-			client,
-			userCodeClassLoader,
-			jobExecutionResult);
+				executorServiceLoader,
+				configuration,
+				userCodeClassLoader,
+				jobExecutionResult);
 	}
 
 	private void verifyCreateIsCalledOnceWhenInDetachedMode() {
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 6e53abb..2b9c038 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client.program;
 
-import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
@@ -53,18 +52,12 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.net.URL;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
@@ -107,67 +100,67 @@ public class ClientTest extends TestLogger {
 		return configuration;
 	}
 
-	/**
-	 * Tests that invalid detached mode programs fail.
-	 */
-	@Test
-	public void testDetachedMode() throws Exception{
-		final ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
-		try {
-			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build();
-			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
-			fail(FAIL_MESSAGE);
-		} catch (ProgramInvocationException e) {
-			assertEquals(
-					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE,
-					e.getCause().getMessage());
-		}
-
-		try {
-			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
-			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
-			fail(FAIL_MESSAGE);
-		} catch (ProgramInvocationException e) {
-			assertEquals(
-					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
-					e.getCause().getMessage());
-		}
-
-		try {
-			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
-			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
-			fail(FAIL_MESSAGE);
-		} catch (ProgramInvocationException e) {
-			assertEquals(
-					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
-					e.getCause().getMessage());
-		}
-
-		try {
-			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
-			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
-			fail(FAIL_MESSAGE);
-		} catch (ProgramInvocationException e) {
-			assertEquals(
-					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
-					e.getCause().getMessage());
-		}
-
-		try {
-			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
-			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
-			fail(FAIL_MESSAGE);
-		} catch (ProgramInvocationException e) {
-			assertEquals(
-					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
-					e.getCause().getMessage());
-		}
-	}
+//	/**
+//	 * Tests that invalid detached mode programs fail.
+//	 */
+//	@Test
+//	public void testDetachedMode() throws Exception{
+//		final ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
+//		try {
+//			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build();
+//			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+//			ClientUtils.executeProgram(configuration, clusterClient, prg);
+//			fail(FAIL_MESSAGE);
+//		} catch (ProgramInvocationException e) {
+//			assertEquals(
+//					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE,
+//					e.getCause().getMessage());
+//		}
+//
+//		try {
+//			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
+//			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+//			ClientUtils.executeProgram(configuration, clusterClient, prg);
+//			fail(FAIL_MESSAGE);
+//		} catch (ProgramInvocationException e) {
+//			assertEquals(
+//					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
+//					e.getCause().getMessage());
+//		}
+//
+//		try {
+//			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
+//			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+//			ClientUtils.executeProgram(configuration, clusterClient, prg);
+//			fail(FAIL_MESSAGE);
+//		} catch (ProgramInvocationException e) {
+//			assertEquals(
+//					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
+//					e.getCause().getMessage());
+//		}
+//
+//		try {
+//			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
+//			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+//			ClientUtils.executeProgram(configuration, clusterClient, prg);
+//			fail(FAIL_MESSAGE);
+//		} catch (ProgramInvocationException e) {
+//			assertEquals(
+//					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
+//					e.getCause().getMessage());
+//		}
+//
+//		try {
+//			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
+//			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+//			ClientUtils.executeProgram(configuration, clusterClient, prg);
+//			fail(FAIL_MESSAGE);
+//		} catch (ProgramInvocationException e) {
+//			assertEquals(
+//					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
+//					e.getCause().getMessage());
+//		}
+//	}
 
 	/**
 	 * This test verifies correct job submission messaging logic and plan translation calls.
@@ -191,31 +184,31 @@ public class ClientTest extends TestLogger {
 	 * This test verifies that the local execution environment cannot be created when
 	 * the program is submitted through a client.
 	 */
-	@Test
-	public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException {
-		PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
-
-		when(packagedProgramMock.getUserCodeClassLoader())
-				.thenReturn(packagedProgramMock.getClass().getClassLoader());
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) throws Throwable {
-				ExecutionEnvironment.createLocalEnvironment();
-				return null;
-			}
-		}).when(packagedProgramMock).invokeInteractiveModeForExecution();
-
-		try {
-			final ClusterClient<?> client = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
-			final Configuration configuration = fromPackagedProgram(packagedProgramMock, 1, true);
-			ClientUtils.executeProgram(configuration, client, packagedProgramMock);
-			fail("Creating the local execution environment should not be possible");
-		}
-		catch (InvalidProgramException e) {
-			// that is what we want
-		}
-	}
+//	@Test
+//	public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException {
+//		PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
+//
+//		when(packagedProgramMock.getUserCodeClassLoader())
+//				.thenReturn(packagedProgramMock.getClass().getClassLoader());
+//
+//		doAnswer(new Answer<Void>() {
+//			@Override
+//			public Void answer(InvocationOnMock invocation) throws Throwable {
+//				ExecutionEnvironment.createLocalEnvironment();
+//				return null;
+//			}
+//		}).when(packagedProgramMock).invokeInteractiveModeForExecution();
+//
+//		try {
+//			final ClusterClient<?> client = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
+//			final Configuration configuration = fromPackagedProgram(packagedProgramMock, 1, true);
+//			ClientUtils.executeProgram(configuration, client, packagedProgramMock);
+//			fail("Creating the local execution environment should not be possible");
+//		}
+//		catch (InvalidProgramException e) {
+//			// that is what we want
+//		}
+//	}
 
 	@Test
 	public void testGetExecutionPlan() throws ProgramInvocationException {
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 64c0034..b627b71 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
+ * todo make it singleton
  * The default implementation of the {@link ExecutorServiceLoader}. This implementation uses
  * Java service discovery to find the available {@link ExecutorFactory executor factories}.
  */
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index bab31d3..5ca660f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -19,12 +19,11 @@ package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Special {@link StreamExecutionEnvironment} that will be used in cases where the CLI client or
  * testing utilities create a {@link StreamExecutionEnvironment} that should be used when
@@ -35,37 +34,21 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
 	private final ContextEnvironment ctx;
 
-	protected StreamContextEnvironment(ContextEnvironment ctx) {
-		this.ctx = ctx;
+	StreamContextEnvironment(ContextEnvironment ctx) {
+		super(ctx.getExecutorServiceLoader(), ctx.getConfiguration());
+		this.ctx = checkNotNull(ctx);
+
 		if (ctx.getParallelism() > 0) {
 			setParallelism(ctx.getParallelism());
 		}
+		setUserClassloader(ctx.getUserClassloader());
 	}
 
 	@Override
 	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 		transformations.clear();
-
-		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
-				streamGraph,
-				ctx.getClient().getFlinkConfiguration(),
-				getParallelism());
-
-		jobGraph.addJars(ctx.getJars());
-		jobGraph.setClasspaths(ctx.getClasspaths());
-
-		// running from the CLI will override the savepoint restore settings
-		jobGraph.setSavepointRestoreSettings(ctx.getSavepointRestoreSettings());
-
-		JobExecutionResult jobExecutionResult;
-		if (ctx.isDetached()) {
-			jobExecutionResult = ClientUtils.submitJob(ctx.getClient(), jobGraph);
-		} else {
-			jobExecutionResult = ClientUtils.submitJobAndWaitForResult(ctx.getClient(), jobGraph, ctx.getUserCodeClassLoader());
-		}
-
+		JobExecutionResult jobExecutionResult = super.execute(streamGraph);
 		ctx.setJobExecutionResult(jobExecutionResult);
-
 		return jobExecutionResult;
 	}
 }