You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/04/27 17:11:35 UTC

flink git commit: [FLINK-1924] minor refactoring of the Python API

Repository: flink
Updated Branches:
  refs/heads/master fceb90a3e -> 247cad1cd


[FLINK-1924] minor refactoring of the Python API

- code formatting
- simpler python process initialization
- renaming of the python connection following the switch to TCP

This closes #616.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/247cad1c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/247cad1c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/247cad1c

Branch: refs/heads/master
Commit: 247cad1cda9cb06e95498ee2db6ae0095eac11bf
Parents: fceb90a
Author: zentol <s....@web.de>
Authored: Fri Mar 27 11:53:23 2015 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Apr 27 17:01:51 2015 +0200

----------------------------------------------------------------------
 .../api/java/common/PlanBinder.java             |  6 +---
 .../api/java/python/PythonPlanBinder.java       | 35 ++++++++------------
 .../java/python/streaming/PythonStreamer.java   | 24 +++++---------
 .../api/python/flink/connection/Connection.py   | 14 ++++----
 .../python/flink/functions/CoGroupFunction.py   |  2 +-
 .../api/python/flink/functions/Function.py      |  2 +-
 .../flink/functions/GroupReduceFunction.py      |  4 +--
 .../python/flink/functions/ReduceFunction.py    |  4 +--
 .../api/python/flink/plan/DataSet.py            |  1 -
 9 files changed, 35 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
index f701ab7..a1bd2e0 100644
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
@@ -54,11 +54,7 @@ public abstract class PlanBinder<INFO extends OperationInfo> {
 
 	public static boolean DEBUG = false;
 
-	public static void setLocalMode() {
-		FLINK_HDFS_PATH = System.getProperty("java.io.tmpdir") + "/flink";
-	}
-
-	protected HashMap<Integer, Object> sets;
+	protected HashMap<Integer, Object> sets = new HashMap();
 	public static ExecutionEnvironment env;
 	protected Receiver receiver;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
index d65931a..c278f5c 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
@@ -16,7 +16,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
-import java.util.HashMap;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.LocalEnvironment;
@@ -57,8 +56,8 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 
 	public static final String FLINK_PYTHON2_BINARY_KEY = "python.binary.python2";
 	public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3";
-	public static  String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
-	public static  String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
+	public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
+	public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
 
 	private static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + "/flink_plan";
 	protected static final String FLINK_PYTHON_REL_LOCAL_PATH = "/resources/python";
@@ -104,13 +103,14 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 				split = x;
 			}
 		}
+
 		try {
 			prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 : split));
 			startPython(Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));
 			receivePlan();
 
 			if (env instanceof LocalEnvironment) {
-				FLINK_HDFS_PATH = "file:/tmp/flink";
+				FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + "/flink";
 			}
 
 			distributeFiles(env);
@@ -172,7 +172,6 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 	}
 
 	private void startPython(String[] args) throws IOException {
-		sets = new HashMap();
 		StringBuilder argsBuilder = new StringBuilder();
 		for (String arg : args) {
 			argsBuilder.append(" ").append(arg);
@@ -180,23 +179,15 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 		receiver = new Receiver(null);
 		receiver.open(null);
 
-		if (usePython3) {
-			try {
-				Runtime.getRuntime().exec(FLINK_PYTHON3_BINARY_PATH);
-			} catch (IOException ex) {
-				throw new RuntimeException(FLINK_PYTHON3_BINARY_KEY + "=" + FLINK_PYTHON3_BINARY_PATH + " does not point to a valid python binary.");
-			}
-			process = Runtime.getRuntime().exec(FLINK_PYTHON3_BINARY_PATH + " -B "
-					+ FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
-		} else {
-			try {
-				Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH);
-			} catch (IOException ex) {
-				throw new RuntimeException(FLINK_PYTHON2_BINARY_KEY + "=" + FLINK_PYTHON2_BINARY_PATH + " does not point to a valid python binary.");
-			}
-			process = Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH + " -B "
-					+ FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
+		String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
+
+		try {
+			Runtime.getRuntime().exec(pythonBinaryPath);
+		} catch (IOException ex) {
+			throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
 		}
+		process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
+
 		new StreamPrinter(process.getInputStream()).start();
 		new StreamPrinter(process.getErrorStream()).start();
 
@@ -210,7 +201,7 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 			if (value != 0) {
 				throw new RuntimeException("Plan file caused an error. Check log-files for details.");
 			}
-		} catch (IllegalThreadStateException ise) {
+		} catch (IllegalThreadStateException ise) {//Process still running
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
index f636caf..7dc2240 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
+++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
@@ -23,9 +23,7 @@ import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_
 import org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter;
 import org.apache.flink.languagebinding.api.java.common.streaming.Streamer;
 import org.apache.flink.languagebinding.api.java.python.PythonPlanBinder;
-import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON2_BINARY_KEY;
 import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
-import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON3_BINARY_KEY;
 import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
 
 /**
@@ -85,21 +83,15 @@ public class PythonStreamer extends Streamer {
 			importString.append(FLINK_PYTHON_PLAN_NAME.substring(1, FLINK_PYTHON_PLAN_NAME.length() - 3));
 		}
 
-		if (usePython3) {
-			try {
-				Runtime.getRuntime().exec(FLINK_PYTHON3_BINARY_PATH);
-			} catch (IOException ex) {
-				throw new RuntimeException(FLINK_PYTHON3_BINARY_KEY + "=" + FLINK_PYTHON3_BINARY_PATH + " does not point to a valid python binary.");
-			}
-			pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort());
-		} else {
-			try {
-				Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH);
-			} catch (IOException ex) {
-				throw new RuntimeException(FLINK_PYTHON2_BINARY_KEY + "=" + FLINK_PYTHON2_BINARY_PATH + " does not point to a valid python binary.");
-			}
-			pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort());
+		String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
+
+		try {
+			Runtime.getRuntime().exec(pythonBinaryPath);
+		} catch (IOException ex) {
+			throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
 		}
+		pb.command(pythonBinaryPath, "-O", "-B", executorPath, "" + server.getLocalPort());
+
 		if (debug) {
 			socket.setSoTimeout(0);
 			LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName()

http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
index cffe79e..7ccc995 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
@@ -60,7 +60,7 @@ class OneWayBusyBufferingMappedFileConnection(object):
         self._file_output_buffer.write(b'\x01')
 
 
-class BufferingUDPMappedFileConnection(object):
+class BufferingTCPMappedFileConnection(object):
     def __init__(self, input_file=tempfile.gettempdir() + "/flink_data/input", output_file=tempfile.gettempdir() + "/flink_data/output", socket=None):
         self._input_file = open(input_file, "rb+")
         self._output_file = open(output_file, "rb+")
@@ -71,7 +71,7 @@ class BufferingUDPMappedFileConnection(object):
         self._out = deque()
         self._out_size = 0
 
-        self._input = ""
+        self._input = b""
         self._input_offset = 0
         self._input_size = 0
         self._was_last = False
@@ -124,13 +124,13 @@ class BufferingUDPMappedFileConnection(object):
         self._was_last = False
         self._input_size = 0
         self._input_offset = 0
-        self._input = ""
+        self._input = b""
 
 
-class TwinBufferingUDPMappedFileConnection(BufferingUDPMappedFileConnection):
+class TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection):
     def __init__(self, input_file=tempfile.gettempdir() + "/flink/data/input", output_file=tempfile.gettempdir() + "/flink/data/output", socket=None):
-        super(TwinBufferingUDPMappedFileConnection, self).__init__(input_file, output_file, socket)
-        self._input = ["", ""]
+        super(TwinBufferingTCPMappedFileConnection, self).__init__(input_file, output_file, socket)
+        self._input = [b"", b""]
         self._input_offset = [0, 0]
         self._input_size = [0, 0]
         self._was_last = [False, False]
@@ -161,6 +161,6 @@ class TwinBufferingUDPMappedFileConnection(BufferingUDPMappedFileConnection):
         self._was_last = [False, False]
         self._input_size = [0, 0]
         self._input_offset = [0, 0]
-        self._input = ["", ""]
+        self._input = [b"", b""]
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
index c39caf7..db951fe 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
@@ -26,7 +26,7 @@ class CoGroupFunction(Function.Function):
         self._keys2 = None
 
     def _configure(self, input_file, output_file, port):
-        self._connection = Connection.TwinBufferingUDPMappedFileConnection(input_file, output_file, port)
+        self._connection = Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, port)
         self._iterator = Iterator.Iterator(self._connection, 0)
         self._iterator2 = Iterator.Iterator(self._connection, 1)
         self._cgiter = Iterator.CoGroupIterator(self._iterator, self._iterator2, self._keys1, self._keys2)

http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
index 8e5227e..dfbb5c5 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
@@ -35,7 +35,7 @@ class Function(object):
         self._meta = None
 
     def _configure(self, input_file, output_file, port):
-        self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+        self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
         self._iterator = Iterator.Iterator(self._connection)
         self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
         self._configure_chain(Collector.Collector(self._connection))

http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
index 0ce23ff..11bba30 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
@@ -31,13 +31,13 @@ class GroupReduceFunction(Function.Function):
 
     def _configure(self, input_file, output_file, port):
         if self._combine:
-            self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+            self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
             self._iterator = Iterator.Iterator(self._connection)
             self._collector = Collector.Collector(self._connection)
             self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
             self._run = self._run_combine
         else:
-            self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+            self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
             self._iterator = Iterator.Iterator(self._connection)
             self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
             self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)

http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
index 7da1ef4..ffa6de0 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
@@ -29,13 +29,13 @@ class ReduceFunction(Function.Function):
 
     def _configure(self, input_file, output_file, port):
         if self._combine:
-            self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+            self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
             self._iterator = Iterator.Iterator(self._connection)
             self._collector = Collector.Collector(self._connection)
             self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
             self._run = self._run_combine
         else:
-            self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+            self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
             self._iterator = Iterator.Iterator(self._connection)
             if self._keys is None:
                 self._run = self._run_allreduce

http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
index a906fb2..390a08d 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
@@ -30,7 +30,6 @@ from flink.functions.MapFunction import MapFunction
 from flink.functions.MapPartitionFunction import MapPartitionFunction
 from flink.functions.ReduceFunction import ReduceFunction
 
-
 def deduct_output_type(dataset):
     skip = set([_Identifier.GROUP, _Identifier.SORT, _Identifier.UNION])
     source = set([_Identifier.SOURCE_CSV, _Identifier.SOURCE_TEXT, _Identifier.SOURCE_VALUE])