You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/04/27 17:11:35 UTC
flink git commit: [FLINK-1924] minor refactoring of the Python API
Repository: flink
Updated Branches:
refs/heads/master fceb90a3e -> 247cad1cd
[FLINK-1924] minor refactoring of the Python API
- code formatting
- simpler python process initialization
- renaming of the python connection following the switch to TCP
This closes #616.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/247cad1c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/247cad1c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/247cad1c
Branch: refs/heads/master
Commit: 247cad1cda9cb06e95498ee2db6ae0095eac11bf
Parents: fceb90a
Author: zentol <s....@web.de>
Authored: Fri Mar 27 11:53:23 2015 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Apr 27 17:01:51 2015 +0200
----------------------------------------------------------------------
.../api/java/common/PlanBinder.java | 6 +---
.../api/java/python/PythonPlanBinder.java | 35 ++++++++------------
.../java/python/streaming/PythonStreamer.java | 24 +++++---------
.../api/python/flink/connection/Connection.py | 14 ++++----
.../python/flink/functions/CoGroupFunction.py | 2 +-
.../api/python/flink/functions/Function.py | 2 +-
.../flink/functions/GroupReduceFunction.py | 4 +--
.../python/flink/functions/ReduceFunction.py | 4 +--
.../api/python/flink/plan/DataSet.py | 1 -
9 files changed, 35 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
index f701ab7..a1bd2e0 100644
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
@@ -54,11 +54,7 @@ public abstract class PlanBinder<INFO extends OperationInfo> {
public static boolean DEBUG = false;
- public static void setLocalMode() {
- FLINK_HDFS_PATH = System.getProperty("java.io.tmpdir") + "/flink";
- }
-
- protected HashMap<Integer, Object> sets;
+ protected HashMap<Integer, Object> sets = new HashMap();
public static ExecutionEnvironment env;
protected Receiver receiver;
http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
index d65931a..c278f5c 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
@@ -16,7 +16,6 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
-import java.util.HashMap;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
@@ -57,8 +56,8 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
public static final String FLINK_PYTHON2_BINARY_KEY = "python.binary.python2";
public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3";
- public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
- public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
+ public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
+ public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
private static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + "/flink_plan";
protected static final String FLINK_PYTHON_REL_LOCAL_PATH = "/resources/python";
@@ -104,13 +103,14 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
split = x;
}
}
+
try {
prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 : split));
startPython(Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length));
receivePlan();
if (env instanceof LocalEnvironment) {
- FLINK_HDFS_PATH = "file:/tmp/flink";
+ FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + "/flink";
}
distributeFiles(env);
@@ -172,7 +172,6 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
}
private void startPython(String[] args) throws IOException {
- sets = new HashMap();
StringBuilder argsBuilder = new StringBuilder();
for (String arg : args) {
argsBuilder.append(" ").append(arg);
@@ -180,23 +179,15 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
receiver = new Receiver(null);
receiver.open(null);
- if (usePython3) {
- try {
- Runtime.getRuntime().exec(FLINK_PYTHON3_BINARY_PATH);
- } catch (IOException ex) {
- throw new RuntimeException(FLINK_PYTHON3_BINARY_KEY + "=" + FLINK_PYTHON3_BINARY_PATH + " does not point to a valid python binary.");
- }
- process = Runtime.getRuntime().exec(FLINK_PYTHON3_BINARY_PATH + " -B "
- + FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
- } else {
- try {
- Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH);
- } catch (IOException ex) {
- throw new RuntimeException(FLINK_PYTHON2_BINARY_KEY + "=" + FLINK_PYTHON2_BINARY_PATH + " does not point to a valid python binary.");
- }
- process = Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH + " -B "
- + FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
+ String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
+
+ try {
+ Runtime.getRuntime().exec(pythonBinaryPath);
+ } catch (IOException ex) {
+ throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
}
+ process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
+
new StreamPrinter(process.getInputStream()).start();
new StreamPrinter(process.getErrorStream()).start();
@@ -210,7 +201,7 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
if (value != 0) {
throw new RuntimeException("Plan file caused an error. Check log-files for details.");
}
- } catch (IllegalThreadStateException ise) {
+ } catch (IllegalThreadStateException ise) {//Process still running
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
index f636caf..7dc2240 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
+++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
@@ -23,9 +23,7 @@ import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_
import org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter;
import org.apache.flink.languagebinding.api.java.common.streaming.Streamer;
import org.apache.flink.languagebinding.api.java.python.PythonPlanBinder;
-import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON2_BINARY_KEY;
import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
-import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON3_BINARY_KEY;
import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
/**
@@ -85,21 +83,15 @@ public class PythonStreamer extends Streamer {
importString.append(FLINK_PYTHON_PLAN_NAME.substring(1, FLINK_PYTHON_PLAN_NAME.length() - 3));
}
- if (usePython3) {
- try {
- Runtime.getRuntime().exec(FLINK_PYTHON3_BINARY_PATH);
- } catch (IOException ex) {
- throw new RuntimeException(FLINK_PYTHON3_BINARY_KEY + "=" + FLINK_PYTHON3_BINARY_PATH + " does not point to a valid python binary.");
- }
- pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort());
- } else {
- try {
- Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH);
- } catch (IOException ex) {
- throw new RuntimeException(FLINK_PYTHON2_BINARY_KEY + "=" + FLINK_PYTHON2_BINARY_PATH + " does not point to a valid python binary.");
- }
- pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort());
+ String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
+
+ try {
+ Runtime.getRuntime().exec(pythonBinaryPath);
+ } catch (IOException ex) {
+ throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
}
+ pb.command(pythonBinaryPath, "-O", "-B", executorPath, "" + server.getLocalPort());
+
if (debug) {
socket.setSoTimeout(0);
LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName()
http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
index cffe79e..7ccc995 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
@@ -60,7 +60,7 @@ class OneWayBusyBufferingMappedFileConnection(object):
self._file_output_buffer.write(b'\x01')
-class BufferingUDPMappedFileConnection(object):
+class BufferingTCPMappedFileConnection(object):
def __init__(self, input_file=tempfile.gettempdir() + "/flink_data/input", output_file=tempfile.gettempdir() + "/flink_data/output", socket=None):
self._input_file = open(input_file, "rb+")
self._output_file = open(output_file, "rb+")
@@ -71,7 +71,7 @@ class BufferingUDPMappedFileConnection(object):
self._out = deque()
self._out_size = 0
- self._input = ""
+ self._input = b""
self._input_offset = 0
self._input_size = 0
self._was_last = False
@@ -124,13 +124,13 @@ class BufferingUDPMappedFileConnection(object):
self._was_last = False
self._input_size = 0
self._input_offset = 0
- self._input = ""
+ self._input = b""
-class TwinBufferingUDPMappedFileConnection(BufferingUDPMappedFileConnection):
+class TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection):
def __init__(self, input_file=tempfile.gettempdir() + "/flink/data/input", output_file=tempfile.gettempdir() + "/flink/data/output", socket=None):
- super(TwinBufferingUDPMappedFileConnection, self).__init__(input_file, output_file, socket)
- self._input = ["", ""]
+ super(TwinBufferingTCPMappedFileConnection, self).__init__(input_file, output_file, socket)
+ self._input = [b"", b""]
self._input_offset = [0, 0]
self._input_size = [0, 0]
self._was_last = [False, False]
@@ -161,6 +161,6 @@ class TwinBufferingUDPMappedFileConnection(BufferingUDPMappedFileConnection):
self._was_last = [False, False]
self._input_size = [0, 0]
self._input_offset = [0, 0]
- self._input = ["", ""]
+ self._input = [b"", b""]
http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
index c39caf7..db951fe 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/CoGroupFunction.py
@@ -26,7 +26,7 @@ class CoGroupFunction(Function.Function):
self._keys2 = None
def _configure(self, input_file, output_file, port):
- self._connection = Connection.TwinBufferingUDPMappedFileConnection(input_file, output_file, port)
+ self._connection = Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection, 0)
self._iterator2 = Iterator.Iterator(self._connection, 1)
self._cgiter = Iterator.CoGroupIterator(self._iterator, self._iterator2, self._keys1, self._keys2)
http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
index 8e5227e..dfbb5c5 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
@@ -35,7 +35,7 @@ class Function(object):
self._meta = None
def _configure(self, input_file, output_file, port):
- self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+ self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection)
self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
self._configure_chain(Collector.Collector(self._connection))
http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
index 0ce23ff..11bba30 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
@@ -31,13 +31,13 @@ class GroupReduceFunction(Function.Function):
def _configure(self, input_file, output_file, port):
if self._combine:
- self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+ self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection)
self._collector = Collector.Collector(self._connection)
self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
self._run = self._run_combine
else:
- self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+ self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection)
self._group_iterator = Iterator.GroupIterator(self._iterator, self._keys)
self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
index 7da1ef4..ffa6de0 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/ReduceFunction.py
@@ -29,13 +29,13 @@ class ReduceFunction(Function.Function):
def _configure(self, input_file, output_file, port):
if self._combine:
- self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+ self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection)
self._collector = Collector.Collector(self._connection)
self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector)
self._run = self._run_combine
else:
- self._connection = Connection.BufferingUDPMappedFileConnection(input_file, output_file, port)
+ self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
self._iterator = Iterator.Iterator(self._connection)
if self._keys is None:
self._run = self._run_allreduce
http://git-wip-us.apache.org/repos/asf/flink/blob/247cad1c/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
index a906fb2..390a08d 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
@@ -30,7 +30,6 @@ from flink.functions.MapFunction import MapFunction
from flink.functions.MapPartitionFunction import MapPartitionFunction
from flink.functions.ReduceFunction import ReduceFunction
-
def deduct_output_type(dataset):
skip = set([_Identifier.GROUP, _Identifier.SORT, _Identifier.UNION])
source = set([_Identifier.SOURCE_CSV, _Identifier.SOURCE_TEXT, _Identifier.SOURCE_VALUE])