You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/14 23:49:33 UTC

[6/7] flink git commit: [FLINK-9591][py] Remove remnants of distributed-cache logic

[FLINK-9591][py] Remove remnants of distributed-cache logic


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0dc0e36f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0dc0e36f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0dc0e36f

Branch: refs/heads/master
Commit: 0dc0e36f204401f14e078d24348421cd4140577d
Parents: 24266fd
Author: zentol <ch...@apache.org>
Authored: Thu Jun 14 19:58:32 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 14 22:35:13 2018 +0200

----------------------------------------------------------------------
 docs/dev/batch/python.md                                |  8 ++------
 .../java/org/apache/flink/python/api/PythonOptions.java | 10 ----------
 .../org/apache/flink/python/api/PythonPlanBinder.java   | 12 +-----------
 .../apache/flink/python/api/flink/plan/Environment.py   |  3 ---
 4 files changed, 3 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0dc0e36f/docs/dev/batch/python.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/python.md b/docs/dev/batch/python.md
index 2211102..486aa18 100644
--- a/docs/dev/batch/python.md
+++ b/docs/dev/batch/python.md
@@ -136,10 +136,7 @@ The first two do as the name suggests.
 Please refer to [Data Sinks](#data-sinks) for more information on writing to files.
 
 Once you specified the complete program you need to call `execute` on
-the `Environment`. This will either execute on your local machine or submit your program
-for execution on a cluster, depending on how Flink was started. You can force
-a local execution by using `execute(local=True)`.
-
+the `Environment`. This will submit your program for execution on a cluster.
 {% top %}
 
 Project setup
@@ -159,8 +156,7 @@ Lazy Evaluation
 All Flink programs are executed lazily: When the program's main method is executed, the data loading
 and transformations do not happen directly. Rather, each operation is created and added to the
 program's plan. The operations are actually executed when one of the `execute()` methods is invoked
-on the Environment object. Whether the program is executed locally or on a cluster depends
-on the environment of the program.
+on the Environment object.
 
 The lazy evaluation lets you construct sophisticated programs that Flink executes as one
 holistically planned unit.

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc0e36f/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
index 4137c11..f89a0fe 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
@@ -20,8 +20,6 @@ package org.apache.flink.python.api;
 
 import org.apache.flink.configuration.ConfigOption;
 
-import java.io.File;
-
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
@@ -62,14 +60,6 @@ public class PythonOptions {
 		key("python.mmap.tmp.dir")
 			.noDefaultValue();
 
-	/**
-	 * The config parameter defining where the flink python library and user supplied files will be uploaded to before
-	 * registering them with the Distributed Cache. This directory must be accessible from all worker nodes.
-	 */
-	public static final ConfigOption<String> DC_TMP_DIR =
-		key("python.dc.tmp.dir")
-			.defaultValue(System.getProperty("java.io.tmpdir") + File.separator + "flink_dc");
-
 	private PythonOptions() {
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc0e36f/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 1df15a5..1182708 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -80,7 +80,6 @@ public class PythonPlanBinder {
 	private final Configuration operatorConfig;
 
 	private final String tmpPlanFilesDir;
-	private Path tmpDistributedDir;
 
 	private final SetCache sets = new SetCache();
 	private int currentEnvironmentID = 0;
@@ -109,8 +108,6 @@ public class PythonPlanBinder {
 			? configuredPlanTmpPath
 			: System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + UUID.randomUUID();
 
-		tmpDistributedDir = new Path(globalConfig.getString(PythonOptions.DC_TMP_DIR));
-
 		operatorConfig = new Configuration();
 		operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH));
 		String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR);
@@ -264,25 +261,18 @@ public class PythonPlanBinder {
 	 */
 	private enum Parameters {
 		DOP,
-		MODE,
 		RETRY,
 		ID
 	}
 
 	private void receiveParameters(ExecutionEnvironment env) throws IOException {
-		for (int x = 0; x < 4; x++) {
+		for (int x = 0; x < Parameters.values().length; x++) {
 			Tuple value = (Tuple) streamer.getRecord(true);
 			switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) {
 				case DOP:
 					Integer dop = value.<Integer>getField(1);
 					env.setParallelism(dop);
 					break;
-				case MODE:
-					if (value.<Boolean>getField(1)) {
-						LOG.info("Local execution specified, using default for {}.", PythonOptions.DC_TMP_DIR);
-						tmpDistributedDir = new Path(PythonOptions.DC_TMP_DIR.defaultValue());
-					}
-					break;
 				case RETRY:
 					int retry = value.<Integer>getField(1);
 					env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retry, 10000L));

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc0e36f/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 797ae96..2cc6ecc 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -89,7 +89,6 @@ class Environment(object):
 
         #parameters
         self._dop = -1
-        self._local_mode = False
         self._retry = 0
 
         self._container = container
@@ -212,7 +211,6 @@ class Environment(object):
 
         The environment will execute all parts of the program that have resulted in a "sink" operation.
         """
-        self._local_mode = local
         self._optimize_plan()
 
         if self._container.is_planning():
@@ -326,7 +324,6 @@ class Environment(object):
     def _send_parameters(self):
         collect = self._collector.collect
         collect(("dop", self._dop))
-        collect(("mode", self._local_mode))
         collect(("retry", self._retry))
         collect(("id", self._env_id))