You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/19 04:42:15 UTC
[flink] 04/15: [hotfix] Annotate all the Executor-related
interfaces as @Internal
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 719bf99de9655c8c47b6e5c6effdad1b8a93ee14
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 14:22:03 2019 +0100
[hotfix] Annotate all the Executor-related interfaces as @Internal
---
.../apache/flink/core/execution/DefaultExecutorServiceLoader.java | 6 +++++-
.../src/main/java/org/apache/flink/core/execution/Executor.java | 7 ++++++-
.../java/org/apache/flink/core/execution/ExecutorFactory.java | 8 ++++++--
.../org/apache/flink/core/execution/ExecutorServiceLoader.java | 6 +++++-
.../java/org/apache/flink/api/java/ExecutorDiscoveryTest.java | 6 ++++--
.../apache/flink/streaming/environment/ExecutorDiscoveryTest.java | 6 ++++--
6 files changed, 30 insertions(+), 9 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 8bde967..a088f8d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -18,11 +18,14 @@
package org.apache.flink.core.execution;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -35,6 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* The default implementation of the {@link ExecutorServiceLoader}. This implementation uses
* Java service discovery to find the available {@link ExecutorFactory executor factories}.
*/
+@Internal
public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
// TODO: This code is almost identical to the ClusterClientServiceLoader and its default implementation.
@@ -48,7 +52,7 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader();
@Override
- public ExecutorFactory getExecutorFactory(final Configuration configuration) {
+ public ExecutorFactory getExecutorFactory(@Nonnull final Configuration configuration) {
checkNotNull(configuration);
final List<ExecutorFactory> compatibleFactories = new ArrayList<>();
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 3476742..7069e70 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -18,20 +18,25 @@
package org.apache.flink.core.execution;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
+import javax.annotation.Nonnull;
+
/**
* The entity responsible for executing a {@link Pipeline}, i.e. a user job.
*/
+@Internal
public interface Executor {
/**
* Executes a {@link Pipeline} based on the provided configuration.
+ *
* @param pipeline the {@link Pipeline} to execute
* @param configuration the {@link Configuration} with the required execution parameters
* @return the {@link JobExecutionResult} corresponding to the pipeline execution.
*/
- JobExecutionResult execute(Pipeline pipeline, Configuration configuration) throws Exception;
+ JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
}
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java
index 8d6687b..9d6860a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java
@@ -18,23 +18,27 @@
package org.apache.flink.core.execution;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
+import javax.annotation.Nonnull;
+
/**
* A factory for selecting and instantiating the adequate {@link Executor}
* based on a provided {@link Configuration}.
*/
+@Internal
public interface ExecutorFactory {
/**
* Returns {@code true} if this factory is compatible with the options in the
* provided configuration, {@code false} otherwise.
*/
- boolean isCompatibleWith(Configuration configuration);
+ boolean isCompatibleWith(@Nonnull final Configuration configuration);
/**
* Instantiates an {@link Executor} compatible with the provided configuration.
* @return the executor instance.
*/
- Executor getExecutor(Configuration configuration);
+ Executor getExecutor(@Nonnull Configuration configuration);
}
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java
index 0b959eb7..5aee4ee 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java
@@ -18,12 +18,16 @@
package org.apache.flink.core.execution;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
+import javax.annotation.Nonnull;
+
/**
* An interface to be implemented by the entity responsible for finding the correct {@link Executor} to
* execute a given {@link org.apache.flink.api.dag.Pipeline}.
*/
+@Internal
public interface ExecutorServiceLoader {
/**
@@ -35,5 +39,5 @@ public interface ExecutorServiceLoader {
* @throws Exception if there is more than one compatible factories, or something went wrong when
* loading the registered factories.
*/
- ExecutorFactory getExecutorFactory(Configuration configuration) throws Exception;
+ ExecutorFactory getExecutorFactory(@Nonnull final Configuration configuration) throws Exception;
}
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
index 49013b8..2d46915 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
@@ -29,6 +29,8 @@ import org.apache.flink.util.OptionalFailure;
import org.junit.Test;
+import javax.annotation.Nonnull;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -69,12 +71,12 @@ public class ExecutorDiscoveryTest {
public static final String ID = "test-executor-A";
@Override
- public boolean isCompatibleWith(Configuration configuration) {
+ public boolean isCompatibleWith(@Nonnull Configuration configuration) {
return ID.equals(configuration.get(DeploymentOptions.TARGET));
}
@Override
- public Executor getExecutor(Configuration configuration) {
+ public Executor getExecutor(@Nonnull Configuration configuration) {
return (pipeline, executionConfig) -> {
final Map<String, OptionalFailure<Object>> res = new HashMap<>();
res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
index 9c11fdf..2a1bb4a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.util.OptionalFailure;
import org.junit.Test;
+import javax.annotation.Nonnull;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -70,12 +72,12 @@ public class ExecutorDiscoveryTest {
public static final String ID = "test-executor-A";
@Override
- public boolean isCompatibleWith(Configuration configuration) {
+ public boolean isCompatibleWith(@Nonnull Configuration configuration) {
return ID.equals(configuration.get(DeploymentOptions.TARGET));
}
@Override
- public Executor getExecutor(Configuration configuration) {
+ public Executor getExecutor(@Nonnull Configuration configuration) {
return (pipeline, executionConfig) -> {
final Map<String, OptionalFailure<Object>> res = new HashMap<>();
res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));