You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 21:02:50 UTC
[flink] 09/14: [FLINK-XXXXX] Update the Executor interface and
introduce the JobClient
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 98e93d522dba8218b44024290862e5424fc35b39
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 16:31:04 2019 +0100
[FLINK-XXXXX] Update the Executor interface and introduce the JobClient
---
.../deployment/AbstractJobClusterExecutor.java | 4 ++
.../deployment/AbstractSessionClusterExecutor.java | 4 ++
.../flink/client/deployment/ExecutorUtils.java | 4 ++
.../flink/client/deployment/JobClientImpl.java | 4 ++
.../org/apache/flink/core/execution/Executor.java | 13 +++--
.../execution/{Executor.java => JobClient.java} | 31 ++++++++----
.../flink/api/java/ExecutionEnvironment.java | 19 ++++++-
...java => ExecutorDiscoveryAndJobClientTest.java} | 59 ++++++++++++++++++----
...org.apache.flink.core.execution.ExecutorFactory | 2 +-
.../environment/StreamExecutionEnvironment.java | 20 +++++++-
...java => ExecutorDiscoveryAndJobClientTest.java} | 59 ++++++++++++++++++----
...org.apache.flink.core.execution.ExecutorFactory | 2 +-
12 files changed, 180 insertions(+), 41 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
new file mode 100644
index 0000000..14a93bc
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class AbstractJobClusterExecutor {
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
new file mode 100644
index 0000000..ab8cc01
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class AbstractSessionClusterExecutor {
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
new file mode 100644
index 0000000..e134206
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class ExecutorUtils {
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
new file mode 100644
index 0000000..e811fc8
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class JobClientImpl {
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 7069e70..1d606e8 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -19,12 +19,13 @@
package org.apache.flink.core.execution;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
import javax.annotation.Nonnull;
+import java.util.concurrent.CompletableFuture;
+
/**
* The entity responsible for executing a {@link Pipeline}, i.e. a user job.
*/
@@ -32,11 +33,15 @@ import javax.annotation.Nonnull;
public interface Executor {
/**
- * Executes a {@link Pipeline} based on the provided configuration.
+ * Executes a {@link Pipeline} based on the provided configuration and returns a {@link JobClient} which allows to
+ * interact with the job being executed, e.g. cancel it or take a savepoint.
+ *
+ * <p><b>ATTENTION:</b> The caller is responsible for managing the lifecycle of the returned {@link JobClient}. This
+ * means that e.g. {@code close()} should be called explicitly at the call-site.
*
* @param pipeline the {@link Pipeline} to execute
* @param configuration the {@link Configuration} with the required execution parameters
- * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
+ * @return a {@link CompletableFuture} with the {@link JobClient} corresponding to the pipeline.
*/
- JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
+ CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
}
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
similarity index 53%
copy from flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
copy to flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 7069e70..06310bd 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -18,25 +18,34 @@
package org.apache.flink.core.execution;
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.JobID;
import javax.annotation.Nonnull;
+import java.util.concurrent.CompletableFuture;
+
/**
- * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
+ * A client that is scoped to a specific job.
*/
-@Internal
-public interface Executor {
+@PublicEvolving
+public interface JobClient extends AutoCloseable {
+
+ /**
+ * Returns the {@link JobID} that uniquely identifies the job this client is scoped to.
+ */
+ JobID getJobID();
+
+ /**
+ * Returns the result of the job submission which will also contain the job id of the submitted job.
+ */
+ CompletableFuture<JobExecutionResult> getJobSubmissionResult();
/**
- * Executes a {@link Pipeline} based on the provided configuration.
+ * Returns the {@link JobExecutionResult result of the job execution} of the submitted job.
*
- * @param pipeline the {@link Pipeline} to execute
- * @param configuration the {@link Configuration} with the required execution parameters
- * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
+ * @param userClassloader the classloader used to de-serialize the accumulators of the job.
*/
- JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
+ CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader);
}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 26632e6..d485105 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -57,6 +57,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.NumberSequenceIterator;
@@ -132,6 +133,8 @@ public class ExecutionEnvironment {
private final Configuration configuration;
+ private ClassLoader userClassloader;
+
/**
* Creates a new Execution Environment.
*/
@@ -146,6 +149,11 @@ public class ExecutionEnvironment {
protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
this.executorServiceLoader = checkNotNull(executorServiceLoader);
this.configuration = checkNotNull(executorConfiguration);
+ this.userClassloader = getClass().getClassLoader();
+ }
+
+ protected void setUserClassloader(final ClassLoader userClassloader) {
+ this.userClassloader = checkNotNull(userClassloader);
}
protected Configuration getConfiguration() {
@@ -796,8 +804,15 @@ public class ExecutionEnvironment {
executorServiceLoader.getExecutorFactory(configuration);
final Executor executor = executorFactory.getExecutor(configuration);
- lastJobExecutionResult = executor.execute(plan, configuration);
- return lastJobExecutionResult;
+
+ try (final JobClient jobClient = executor.execute(plan, configuration).get()) {
+
+ lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
+ ? jobClient.getJobExecutionResult(userClassloader).get()
+ : jobClient.getJobSubmissionResult().get();
+
+ return lastJobExecutionResult;
+ }
}
private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
similarity index 60%
rename from flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
rename to flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
index 2d46915..c674c7e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.util.OptionalFailure;
import org.junit.Test;
@@ -34,25 +35,43 @@ import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsEqual.equalTo;
/**
- * Tests the {@link ExecutorFactory} discovery in the {@link ExecutionEnvironment}.
+ * Tests the {@link ExecutorFactory} discovery in the {@link ExecutionEnvironment} and the calls of the {@link JobClient}.
*/
-public class ExecutorDiscoveryTest {
+public class ExecutorDiscoveryAndJobClientTest {
+
+ private static final String EXEC_NAME = "test-executor";
+ private static final long ATTACHED_RUNTIME = 42L;
+ private static final long DETACHED_RUNTIME = 11L;
+
+ @Test
+ public void jobClientGetJobExecutionResultShouldBeCalledOnAttachedExecution() throws Exception {
+ testHelper(true, ATTACHED_RUNTIME);
+ }
@Test
- public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception {
+ public void jobClientGetJobExecutionResultShouldBeCalledOnDetachedExecution() throws Exception {
+ testHelper(false, DETACHED_RUNTIME);
+ }
+
+ private void testHelper(final boolean attached, final long expectedRuntime) throws Exception {
final Configuration configuration = new Configuration();
- configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID);
+ configuration.set(DeploymentOptions.TARGET, EXEC_NAME);
+ configuration.set(DeploymentOptions.ATTACHED, attached);
final JobExecutionResult result = executeTestJobBasedOnConfig(configuration);
final String executorName = result.getAllAccumulatorResults().get(DeploymentOptions.TARGET.key()).toString();
- assertThat(executorName, is(equalTo(IDReportingExecutorFactory.ID)));
+ assertThat(executorName, is(equalTo(EXEC_NAME)));
+
+ final long runtime = result.getNetRuntime();
+ assertThat(runtime, is(equalTo(expectedRuntime)));
}
private JobExecutionResult executeTestJobBasedOnConfig(final Configuration configuration) throws Exception {
@@ -68,19 +87,39 @@ public class ExecutorDiscoveryTest {
*/
public static class IDReportingExecutorFactory implements ExecutorFactory {
- public static final String ID = "test-executor-A";
-
@Override
public boolean isCompatibleWith(@Nonnull Configuration configuration) {
- return ID.equals(configuration.get(DeploymentOptions.TARGET));
+ return EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET));
}
@Override
public Executor getExecutor(@Nonnull Configuration configuration) {
return (pipeline, executionConfig) -> {
final Map<String, OptionalFailure<Object>> res = new HashMap<>();
- res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
- return new JobExecutionResult(new JobID(), 12L, res);
+ res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(EXEC_NAME));
+
+ return CompletableFuture.completedFuture(new JobClient(){
+
+ @Override
+ public JobID getJobID() {
+ return new JobID();
+ }
+
+ @Override
+ public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+ return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), DETACHED_RUNTIME, res));
+ }
+
+ @Override
+ public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
+ return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), ATTACHED_RUNTIME, res));
+ }
+
+ @Override
+ public void close() {
+
+ }
+ });
};
}
}
diff --git a/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index fcfaa55..c09254a 100644
--- a/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ b/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.api.java.ExecutorDiscoveryTest$IDReportingExecutorFactory
\ No newline at end of file
+org.apache.flink.api.java.ExecutorDiscoveryAndJobClientTest$IDReportingExecutorFactory
\ No newline at end of file
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ba702ea..176a7ab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -53,6 +53,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -154,6 +155,8 @@ public class StreamExecutionEnvironment {
private final Configuration configuration;
+ private ClassLoader userClassloader;
+
// --------------------------------------------------------------------------------------------
// Constructor and Properties
// --------------------------------------------------------------------------------------------
@@ -166,9 +169,16 @@ public class StreamExecutionEnvironment {
this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration);
}
- public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
+ public StreamExecutionEnvironment(
+ final ExecutorServiceLoader executorServiceLoader,
+ final Configuration executorConfiguration) {
this.executorServiceLoader = checkNotNull(executorServiceLoader);
this.configuration = checkNotNull(executorConfiguration);
+ this.userClassloader = getClass().getClassLoader();
+ }
+
+ protected void setUserClassloader(final ClassLoader userClassloader) {
+ this.userClassloader = checkNotNull(userClassloader);
}
protected Configuration getConfiguration() {
@@ -1552,7 +1562,13 @@ public class StreamExecutionEnvironment {
executorServiceLoader.getExecutorFactory(configuration);
final Executor executor = executorFactory.getExecutor(configuration);
- return executor.execute(streamGraph, configuration);
+
+ try (final JobClient jobClient = executor.execute(streamGraph, configuration).get()) {
+
+ return configuration.getBoolean(DeploymentOptions.ATTACHED)
+ ? jobClient.getJobExecutionResult(userClassloader).get()
+ : jobClient.getJobSubmissionResult().get();
+ }
}
private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
similarity index 60%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
index 2a1bb4a..7caf531 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.util.OptionalFailure;
@@ -35,25 +36,43 @@ import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsEqual.equalTo;
/**
- * Tests the {@link ExecutorFactory} discovery in the {@link StreamExecutionEnvironment}.
+ * Tests the {@link ExecutorFactory} discovery in the {@link StreamExecutionEnvironment} and the calls of the {@link JobClient}.
*/
-public class ExecutorDiscoveryTest {
+public class ExecutorDiscoveryAndJobClientTest {
+
+ private static final String EXEC_NAME = "test-executor";
+ private static final long ATTACHED_RUNTIME = 42L;
+ private static final long DETACHED_RUNTIME = 11L;
+
+ @Test
+ public void jobClientGetJobExecutionResultShouldBeCalledOnAttachedExecution() throws Exception {
+ testHelper(true, ATTACHED_RUNTIME);
+ }
@Test
- public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception {
+ public void jobClientGetJobExecutionResultShouldBeCalledOnDetachedExecution() throws Exception {
+ testHelper(false, DETACHED_RUNTIME);
+ }
+
+ private void testHelper(final boolean attached, final long expectedRuntime) throws Exception {
final Configuration configuration = new Configuration();
- configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID);
+ configuration.set(DeploymentOptions.TARGET, EXEC_NAME);
+ configuration.set(DeploymentOptions.ATTACHED, attached);
final JobExecutionResult result = executeTestJobBasedOnConfig(configuration);
final String executorName = result.getAllAccumulatorResults().get(DeploymentOptions.TARGET.key()).toString();
- assertThat(executorName, is(equalTo(IDReportingExecutorFactory.ID)));
+ assertThat(executorName, is(equalTo(EXEC_NAME)));
+
+ final long runtime = result.getNetRuntime();
+ assertThat(runtime, is(equalTo(expectedRuntime)));
}
private JobExecutionResult executeTestJobBasedOnConfig(final Configuration configuration) throws Exception {
@@ -69,19 +88,39 @@ public class ExecutorDiscoveryTest {
*/
public static class IDReportingExecutorFactory implements ExecutorFactory {
- public static final String ID = "test-executor-A";
-
@Override
public boolean isCompatibleWith(@Nonnull Configuration configuration) {
- return ID.equals(configuration.get(DeploymentOptions.TARGET));
+ return EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET));
}
@Override
public Executor getExecutor(@Nonnull Configuration configuration) {
return (pipeline, executionConfig) -> {
final Map<String, OptionalFailure<Object>> res = new HashMap<>();
- res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
- return new JobExecutionResult(new JobID(), 12L, res);
+ res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(EXEC_NAME));
+
+ return CompletableFuture.completedFuture(new JobClient(){
+
+ @Override
+ public JobID getJobID() {
+ return new JobID();
+ }
+
+ @Override
+ public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+ return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), DETACHED_RUNTIME, res));
+ }
+
+ @Override
+ public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
+ return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), ATTACHED_RUNTIME, res));
+ }
+
+ @Override
+ public void close() {
+
+ }
+ });
};
}
}
diff --git a/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index 68ddbcb..a5186ae 100644
--- a/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ b/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.streaming.environment.ExecutorDiscoveryTest$IDReportingExecutorFactory
\ No newline at end of file
+org.apache.flink.streaming.environment.ExecutorDiscoveryAndJobClientTest$IDReportingExecutorFactory
\ No newline at end of file