You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 21:00:41 UTC

[flink] 09/13: [FLINK-XXXXX] Update the Executor interface and introduce the JobClient

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 98e93d522dba8218b44024290862e5424fc35b39
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 16:31:04 2019 +0100

    [FLINK-XXXXX] Update the Executor interface and introduce the JobClient
---
 .../deployment/AbstractJobClusterExecutor.java     |  4 ++
 .../deployment/AbstractSessionClusterExecutor.java |  4 ++
 .../flink/client/deployment/ExecutorUtils.java     |  4 ++
 .../flink/client/deployment/JobClientImpl.java     |  4 ++
 .../org/apache/flink/core/execution/Executor.java  | 13 +++--
 .../execution/{Executor.java => JobClient.java}    | 31 ++++++++----
 .../flink/api/java/ExecutionEnvironment.java       | 19 ++++++-
 ...java => ExecutorDiscoveryAndJobClientTest.java} | 59 ++++++++++++++++++----
 ...org.apache.flink.core.execution.ExecutorFactory |  2 +-
 .../environment/StreamExecutionEnvironment.java    | 20 +++++++-
 ...java => ExecutorDiscoveryAndJobClientTest.java} | 59 ++++++++++++++++++----
 ...org.apache.flink.core.execution.ExecutorFactory |  2 +-
 12 files changed, 180 insertions(+), 41 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
new file mode 100644
index 0000000..14a93bc
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class AbstractJobClusterExecutor {
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
new file mode 100644
index 0000000..ab8cc01
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class AbstractSessionClusterExecutor {
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
new file mode 100644
index 0000000..e134206
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class ExecutorUtils {
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
new file mode 100644
index 0000000..e811fc8
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class JobClientImpl {
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 7069e70..1d606e8 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -19,12 +19,13 @@
 package org.apache.flink.core.execution;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 
 import javax.annotation.Nonnull;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
  */
@@ -32,11 +33,15 @@ import javax.annotation.Nonnull;
 public interface Executor {
 
 	/**
-	 * Executes a {@link Pipeline} based on the provided configuration.
+	 * Executes a {@link Pipeline} based on the provided configuration and returns a {@link JobClient} which allows to
+	 * interact with the job being executed, e.g. cancel it or take a savepoint.
+	 *
+	 * <p><b>ATTENTION:</b> The caller is responsible for managing the lifecycle of the returned {@link JobClient}. This
+	 * means that e.g. {@code close()} should be called explicitly at the call-site.
 	 *
 	 * @param pipeline the {@link Pipeline} to execute
 	 * @param configuration the {@link Configuration} with the required execution parameters
-	 * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
+	 * @return a {@link CompletableFuture} with the {@link JobClient} corresponding to the pipeline.
 	 */
-	JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
+	CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
similarity index 53%
copy from flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
copy to flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 7069e70..06310bd 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -18,25 +18,34 @@
 
 package org.apache.flink.core.execution;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.JobID;
 
 import javax.annotation.Nonnull;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
- * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
+ * A client that is scoped to a specific job.
  */
-@Internal
-public interface Executor {
+@PublicEvolving
+public interface JobClient extends AutoCloseable {
+
+	/**
+	 * Returns the {@link JobID} that uniquely identifies the job this client is scoped to.
+	 */
+	JobID getJobID();
+
+	/**
+	 * Returns the result of the job submission which will also contain the job id of the submitted job.
+	 */
+	CompletableFuture<JobExecutionResult> getJobSubmissionResult();
 
 	/**
-	 * Executes a {@link Pipeline} based on the provided configuration.
+	 * Returns the {@link JobExecutionResult result of the job execution} of the submitted job.
 	 *
-	 * @param pipeline the {@link Pipeline} to execute
-	 * @param configuration the {@link Configuration} with the required execution parameters
-	 * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
+	 * @param userClassloader the classloader used to de-serialize the accumulators of the job.
 	 */
-	JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
+	CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader);
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 26632e6..d485105 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -57,6 +57,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
@@ -132,6 +133,8 @@ public class ExecutionEnvironment {
 
 	private final Configuration configuration;
 
+	private ClassLoader userClassloader;
+
 	/**
 	 * Creates a new Execution Environment.
 	 */
@@ -146,6 +149,11 @@ public class ExecutionEnvironment {
 	protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
 		this.executorServiceLoader = checkNotNull(executorServiceLoader);
 		this.configuration = checkNotNull(executorConfiguration);
+		this.userClassloader = getClass().getClassLoader();
+	}
+
+	protected void setUserClassloader(final ClassLoader userClassloader) {
+		this.userClassloader = checkNotNull(userClassloader);
 	}
 
 	protected Configuration getConfiguration() {
@@ -796,8 +804,15 @@ public class ExecutionEnvironment {
 				executorServiceLoader.getExecutorFactory(configuration);
 
 		final Executor executor = executorFactory.getExecutor(configuration);
-		lastJobExecutionResult = executor.execute(plan, configuration);
-		return lastJobExecutionResult;
+
+		try (final JobClient jobClient = executor.execute(plan, configuration).get()) {
+
+			lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
+					? jobClient.getJobExecutionResult(userClassloader).get()
+					: jobClient.getJobSubmissionResult().get();
+
+			return lastJobExecutionResult;
+		}
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
similarity index 60%
rename from flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
rename to flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
index 2d46915..c674c7e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.util.OptionalFailure;
 
 import org.junit.Test;
@@ -34,25 +35,43 @@ import javax.annotation.Nonnull;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsEqual.equalTo;
 
 /**
- * Tests the {@link ExecutorFactory} discovery in the {@link ExecutionEnvironment}.
+ * Tests the {@link ExecutorFactory} discovery in the {@link ExecutionEnvironment} and the calls of the {@link JobClient}.
  */
-public class ExecutorDiscoveryTest {
+public class ExecutorDiscoveryAndJobClientTest {
+
+	private static final String EXEC_NAME = "test-executor";
+	private static final long ATTACHED_RUNTIME = 42L;
+	private static final long DETACHED_RUNTIME = 11L;
+
+	@Test
+	public void jobClientGetJobExecutionResultShouldBeCalledOnAttachedExecution() throws Exception {
+		testHelper(true, ATTACHED_RUNTIME);
+	}
 
 	@Test
-	public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception {
+	public void jobClientGetJobExecutionResultShouldBeCalledOnDetachedExecution() throws Exception {
+		testHelper(false, DETACHED_RUNTIME);
+	}
+
+	private void testHelper(final boolean attached, final long expectedRuntime) throws Exception {
 		final Configuration configuration = new Configuration();
-		configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID);
+		configuration.set(DeploymentOptions.TARGET, EXEC_NAME);
+		configuration.set(DeploymentOptions.ATTACHED, attached);
 
 		final JobExecutionResult result = executeTestJobBasedOnConfig(configuration);
 
 		final String executorName = result.getAllAccumulatorResults().get(DeploymentOptions.TARGET.key()).toString();
-		assertThat(executorName, is(equalTo(IDReportingExecutorFactory.ID)));
+		assertThat(executorName, is(equalTo(EXEC_NAME)));
+
+		final long runtime = result.getNetRuntime();
+		assertThat(runtime, is(equalTo(expectedRuntime)));
 	}
 
 	private JobExecutionResult executeTestJobBasedOnConfig(final Configuration configuration) throws Exception {
@@ -68,19 +87,39 @@ public class ExecutorDiscoveryTest {
 	 */
 	public static class IDReportingExecutorFactory implements ExecutorFactory {
 
-		public static final String ID = "test-executor-A";
-
 		@Override
 		public boolean isCompatibleWith(@Nonnull Configuration configuration) {
-			return ID.equals(configuration.get(DeploymentOptions.TARGET));
+			return EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET));
 		}
 
 		@Override
 		public Executor getExecutor(@Nonnull Configuration configuration) {
 			return (pipeline, executionConfig) -> {
 				final Map<String, OptionalFailure<Object>> res = new HashMap<>();
-				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
-				return new JobExecutionResult(new JobID(), 12L, res);
+				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(EXEC_NAME));
+
+				return CompletableFuture.completedFuture(new JobClient(){
+
+					@Override
+					public JobID getJobID() {
+						return new JobID();
+					}
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), DETACHED_RUNTIME, res));
+					}
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
+						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), ATTACHED_RUNTIME, res));
+					}
+
+					@Override
+					public void close() {
+
+					}
+				});
 			};
 		}
 	}
diff --git a/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index fcfaa55..c09254a 100644
--- a/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ b/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.api.java.ExecutorDiscoveryTest$IDReportingExecutorFactory
\ No newline at end of file
+org.apache.flink.api.java.ExecutorDiscoveryAndJobClientTest$IDReportingExecutorFactory
\ No newline at end of file
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ba702ea..176a7ab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -53,6 +53,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -154,6 +155,8 @@ public class StreamExecutionEnvironment {
 
 	private final Configuration configuration;
 
+	private ClassLoader userClassloader;
+
 	// --------------------------------------------------------------------------------------------
 	// Constructor and Properties
 	// --------------------------------------------------------------------------------------------
@@ -166,9 +169,16 @@ public class StreamExecutionEnvironment {
 		this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration);
 	}
 
-	public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
+	public StreamExecutionEnvironment(
+			final ExecutorServiceLoader executorServiceLoader,
+			final Configuration executorConfiguration) {
 		this.executorServiceLoader = checkNotNull(executorServiceLoader);
 		this.configuration = checkNotNull(executorConfiguration);
+		this.userClassloader = getClass().getClassLoader();
+	}
+
+	protected void setUserClassloader(final ClassLoader userClassloader) {
+		this.userClassloader = checkNotNull(userClassloader);
 	}
 
 	protected Configuration getConfiguration() {
@@ -1552,7 +1562,13 @@ public class StreamExecutionEnvironment {
 				executorServiceLoader.getExecutorFactory(configuration);
 
 		final Executor executor = executorFactory.getExecutor(configuration);
-		return executor.execute(streamGraph, configuration);
+
+		try (final JobClient jobClient = executor.execute(streamGraph, configuration).get()) {
+
+			return configuration.getBoolean(DeploymentOptions.ATTACHED)
+					? jobClient.getJobExecutionResult(userClassloader).get()
+					: jobClient.getJobSubmissionResult().get();
+		}
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
similarity index 60%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
index 2a1bb4a..7caf531 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.util.OptionalFailure;
@@ -35,25 +36,43 @@ import javax.annotation.Nonnull;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsEqual.equalTo;
 
 /**
- * Tests the {@link ExecutorFactory} discovery in the {@link StreamExecutionEnvironment}.
+ * Tests the {@link ExecutorFactory} discovery in the {@link StreamExecutionEnvironment} and the calls of the {@link JobClient}.
  */
-public class ExecutorDiscoveryTest {
+public class ExecutorDiscoveryAndJobClientTest {
+
+	private static final String EXEC_NAME = "test-executor";
+	private static final long ATTACHED_RUNTIME = 42L;
+	private static final long DETACHED_RUNTIME = 11L;
+
+	@Test
+	public void jobClientGetJobExecutionResultShouldBeCalledOnAttachedExecution() throws Exception {
+		testHelper(true, ATTACHED_RUNTIME);
+	}
 
 	@Test
-	public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception {
+	public void jobClientGetJobExecutionResultShouldBeCalledOnDetachedExecution() throws Exception {
+		testHelper(false, DETACHED_RUNTIME);
+	}
+
+	private void testHelper(final boolean attached, final long expectedRuntime) throws Exception {
 		final Configuration configuration = new Configuration();
-		configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID);
+		configuration.set(DeploymentOptions.TARGET, EXEC_NAME);
+		configuration.set(DeploymentOptions.ATTACHED, attached);
 
 		final JobExecutionResult result = executeTestJobBasedOnConfig(configuration);
 
 		final String executorName = result.getAllAccumulatorResults().get(DeploymentOptions.TARGET.key()).toString();
-		assertThat(executorName, is(equalTo(IDReportingExecutorFactory.ID)));
+		assertThat(executorName, is(equalTo(EXEC_NAME)));
+
+		final long runtime = result.getNetRuntime();
+		assertThat(runtime, is(equalTo(expectedRuntime)));
 	}
 
 	private JobExecutionResult executeTestJobBasedOnConfig(final Configuration configuration) throws Exception {
@@ -69,19 +88,39 @@ public class ExecutorDiscoveryTest {
 	 */
 	public static class IDReportingExecutorFactory implements ExecutorFactory {
 
-		public static final String ID = "test-executor-A";
-
 		@Override
 		public boolean isCompatibleWith(@Nonnull Configuration configuration) {
-			return ID.equals(configuration.get(DeploymentOptions.TARGET));
+			return EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET));
 		}
 
 		@Override
 		public Executor getExecutor(@Nonnull Configuration configuration) {
 			return (pipeline, executionConfig) -> {
 				final Map<String, OptionalFailure<Object>> res = new HashMap<>();
-				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
-				return new JobExecutionResult(new JobID(), 12L, res);
+				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(EXEC_NAME));
+
+				return CompletableFuture.completedFuture(new JobClient(){
+
+					@Override
+					public JobID getJobID() {
+						return new JobID();
+					}
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), DETACHED_RUNTIME, res));
+					}
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
+						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), ATTACHED_RUNTIME, res));
+					}
+
+					@Override
+					public void close() {
+
+					}
+				});
 			};
 		}
 	}
diff --git a/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index 68ddbcb..a5186ae 100644
--- a/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ b/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.environment.ExecutorDiscoveryTest$IDReportingExecutorFactory
\ No newline at end of file
+org.apache.flink.streaming.environment.ExecutorDiscoveryAndJobClientTest$IDReportingExecutorFactory
\ No newline at end of file