You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/10/16 12:43:16 UTC
[flink] 05/15: [FLINK-14290] Use LocalExecutor in
LocalStreamEnvironment
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b47c13c00be47d3a9132761134261d80295920b4
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Sep 12 13:37:39 2019 +0200
[FLINK-14290] Use LocalExecutor in LocalStreamEnvironment
---
.../api/environment/LocalStreamEnvironment.java | 48 ++--------------------
1 file changed, 4 insertions(+), 44 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index f655af6..54f4354 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -20,18 +20,11 @@ package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.annotation.Nonnull;
/**
@@ -45,8 +38,6 @@ import javax.annotation.Nonnull;
@Public
public class LocalStreamEnvironment extends StreamExecutionEnvironment {
- private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);
-
private final Configuration configuration;
/**
@@ -83,42 +74,11 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
*/
@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
- JobGraph jobGraph = streamGraph.getJobGraph();
- jobGraph.setAllowQueuedScheduling(true);
-
- Configuration configuration = new Configuration();
- configuration.addAll(jobGraph.getJobConfiguration());
- configuration.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "0");
-
- // add (and override) the settings with what the user defined
- configuration.addAll(this.configuration);
-
- if (!configuration.contains(RestOptions.BIND_PORT)) {
- configuration.setString(RestOptions.BIND_PORT, "0");
- }
-
- int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
-
- MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
- .setConfiguration(configuration)
- .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
- .build();
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Running job on local embedded Flink mini cluster");
- }
-
- MiniCluster miniCluster = new MiniCluster(cfg);
-
try {
- miniCluster.start();
- configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
-
- return miniCluster.executeJobBlocking(jobGraph);
- }
- finally {
+ final PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
+ return executor.executePlan(streamGraph);
+ } finally {
transformations.clear();
- miniCluster.close();
}
}
}