You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/19 04:42:15 UTC

[flink] 04/15: [hotfix] Annotate all the Executor-related interfaces as @Internal

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 719bf99de9655c8c47b6e5c6effdad1b8a93ee14
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 14:22:03 2019 +0100

    [hotfix] Annotate all the Executor-related interfaces as @Internal
---
 .../apache/flink/core/execution/DefaultExecutorServiceLoader.java | 6 +++++-
 .../src/main/java/org/apache/flink/core/execution/Executor.java   | 7 ++++++-
 .../java/org/apache/flink/core/execution/ExecutorFactory.java     | 8 ++++++--
 .../org/apache/flink/core/execution/ExecutorServiceLoader.java    | 6 +++++-
 .../java/org/apache/flink/api/java/ExecutorDiscoveryTest.java     | 6 ++++--
 .../apache/flink/streaming/environment/ExecutorDiscoveryTest.java | 6 ++++--
 6 files changed, 30 insertions(+), 9 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 8bde967..a088f8d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.core.execution;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -35,6 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * The default implementation of the {@link ExecutorServiceLoader}. This implementation uses
  * Java service discovery to find the available {@link ExecutorFactory executor factories}.
  */
+@Internal
 public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 
 	// TODO: This code is almost identical to the ClusterClientServiceLoader and its default implementation.
@@ -48,7 +52,7 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 	public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader();
 
 	@Override
-	public ExecutorFactory getExecutorFactory(final Configuration configuration) {
+	public ExecutorFactory getExecutorFactory(@Nonnull final Configuration configuration) {
 		checkNotNull(configuration);
 
 		final List<ExecutorFactory> compatibleFactories = new ArrayList<>();
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 3476742..7069e70 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -18,20 +18,25 @@
 
 package org.apache.flink.core.execution;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 
+import javax.annotation.Nonnull;
+
 /**
  * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
  */
+@Internal
 public interface Executor {
 
 	/**
 	 * Executes a {@link Pipeline} based on the provided configuration.
+	 *
 	 * @param pipeline the {@link Pipeline} to execute
 	 * @param configuration the {@link Configuration} with the required execution parameters
 	 * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
 	 */
-	JobExecutionResult execute(Pipeline pipeline, Configuration configuration) throws Exception;
+	JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java
index 8d6687b..9d6860a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java
@@ -18,23 +18,27 @@
 
 package org.apache.flink.core.execution;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 
+import javax.annotation.Nonnull;
+
 /**
  * A factory for selecting and instantiating the adequate {@link Executor}
  * based on a provided {@link Configuration}.
  */
+@Internal
 public interface ExecutorFactory {
 
 	/**
 	 * Returns {@code true} if this factory is compatible with the options in the
 	 * provided configuration, {@code false} otherwise.
 	 */
-	boolean isCompatibleWith(Configuration configuration);
+	boolean isCompatibleWith(@Nonnull final Configuration configuration);
 
 	/**
 	 * Instantiates an {@link Executor} compatible with the provided configuration.
 	 * @return the executor instance.
 	 */
-	Executor getExecutor(Configuration configuration);
+	Executor getExecutor(@Nonnull Configuration configuration);
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java
index 0b959eb7..5aee4ee 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.core.execution;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 
+import javax.annotation.Nonnull;
+
 /**
  * An interface to be implemented by the entity responsible for finding the correct {@link Executor} to
  * execute a given {@link org.apache.flink.api.dag.Pipeline}.
  */
+@Internal
 public interface ExecutorServiceLoader {
 
 	/**
@@ -35,5 +39,5 @@ public interface ExecutorServiceLoader {
 	 * @throws Exception if there is more than one compatible factories, or something went wrong when
 	 * 			loading the registered factories.
 	 */
-	ExecutorFactory getExecutorFactory(Configuration configuration) throws Exception;
+	ExecutorFactory getExecutorFactory(@Nonnull final Configuration configuration) throws Exception;
 }
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
index 49013b8..2d46915 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
@@ -29,6 +29,8 @@ import org.apache.flink.util.OptionalFailure;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -69,12 +71,12 @@ public class ExecutorDiscoveryTest {
 		public static final String ID = "test-executor-A";
 
 		@Override
-		public boolean isCompatibleWith(Configuration configuration) {
+		public boolean isCompatibleWith(@Nonnull Configuration configuration) {
 			return ID.equals(configuration.get(DeploymentOptions.TARGET));
 		}
 
 		@Override
-		public Executor getExecutor(Configuration configuration) {
+		public Executor getExecutor(@Nonnull Configuration configuration) {
 			return (pipeline, executionConfig) -> {
 				final Map<String, OptionalFailure<Object>> res = new HashMap<>();
 				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
index 9c11fdf..2a1bb4a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.util.OptionalFailure;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -70,12 +72,12 @@ public class ExecutorDiscoveryTest {
 		public static final String ID = "test-executor-A";
 
 		@Override
-		public boolean isCompatibleWith(Configuration configuration) {
+		public boolean isCompatibleWith(@Nonnull Configuration configuration) {
 			return ID.equals(configuration.get(DeploymentOptions.TARGET));
 		}
 
 		@Override
-		public Executor getExecutor(Configuration configuration) {
+		public Executor getExecutor(@Nonnull Configuration configuration) {
 			return (pipeline, executionConfig) -> {
 				final Map<String, OptionalFailure<Object>> res = new HashMap<>();
 				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));