You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/10/16 12:43:16 UTC

[flink] 05/15: [FLINK-14290] Use LocalExecutor in LocalStreamEnvironment

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b47c13c00be47d3a9132761134261d80295920b4
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Sep 12 13:37:39 2019 +0200

    [FLINK-14290] Use LocalExecutor in LocalStreamEnvironment
---
 .../api/environment/LocalStreamEnvironment.java    | 48 ++--------------------
 1 file changed, 4 insertions(+), 44 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index f655af6..54f4354 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -20,18 +20,11 @@ package org.apache.flink.streaming.api.environment;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nonnull;
 
 /**
@@ -45,8 +38,6 @@ import javax.annotation.Nonnull;
 @Public
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
-	private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);
-
 	private final Configuration configuration;
 
 	/**
@@ -83,42 +74,11 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@Override
 	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
-		JobGraph jobGraph = streamGraph.getJobGraph();
-		jobGraph.setAllowQueuedScheduling(true);
-
-		Configuration configuration = new Configuration();
-		configuration.addAll(jobGraph.getJobConfiguration());
-		configuration.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "0");
-
-		// add (and override) the settings with what the user defined
-		configuration.addAll(this.configuration);
-
-		if (!configuration.contains(RestOptions.BIND_PORT)) {
-			configuration.setString(RestOptions.BIND_PORT, "0");
-		}
-
-		int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
-
-		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
-			.setConfiguration(configuration)
-			.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
-			.build();
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Running job on local embedded Flink mini cluster");
-		}
-
-		MiniCluster miniCluster = new MiniCluster(cfg);
-
 		try {
-			miniCluster.start();
-			configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
-
-			return miniCluster.executeJobBlocking(jobGraph);
-		}
-		finally {
+			final PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
+			return executor.executePlan(streamGraph);
+		} finally {
 			transformations.clear();
-			miniCluster.close();
 		}
 	}
 }