You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/04/21 15:47:46 UTC

[10/10] flink git commit: [Flink-671] Python API additions

[Flink-671] Python API additions

Fixes several minor issues
Requesting non-available data throws Exception
Python process shutodwn more reliable
Synchronization done via TCP


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1618e28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1618e28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1618e28

Branch: refs/heads/master
Commit: e1618e287885390c9ea1d4954ea18eeed5783b4a
Parents: 1d669da
Author: zentol <s....@web.de>
Authored: Wed Mar 4 17:20:00 2015 +0100
Committer: zentol <s....@web.de>
Committed: Tue Apr 21 14:02:54 2015 +0200

----------------------------------------------------------------------
 .../api/java/common/streaming/Streamer.java     |  64 ++++++---------
 .../java/python/streaming/PythonStreamer.java   |  78 +++++++++++++------
 .../languagebinding/api/python/dill/__diff.py   |  18 -----
 .../languagebinding/api/python/dill/__init__.py |  18 -----
 .../languagebinding/api/python/dill/_objects.py |  18 -----
 .../languagebinding/api/python/dill/detect.py   |  18 -----
 .../languagebinding/api/python/dill/dill.py     |  18 -----
 .../languagebinding/api/python/dill/info.py     |  17 ----
 .../languagebinding/api/python/dill/objtypes.py |  18 -----
 .../languagebinding/api/python/dill/pointers.py |  18 -----
 .../languagebinding/api/python/dill/source.py   |  18 -----
 .../languagebinding/api/python/dill/temp.py     |  18 -----
 .../languagebinding/api/python/executor.py      |  30 ++++---
 .../api/python/flink/connection/Connection.py   |  32 ++++----
 .../api/python/flink/connection/Iterator.py     |   4 +-
 .../api/python/flink/connection/__init__.pyc    | Bin 243 -> 0 bytes
 .../flink/functions/GroupReduceFunction.py      |   2 +-
 pom.xml                                         |   2 +
 18 files changed, 108 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
index 1a96e98..8b19425 100644
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
@@ -13,10 +13,11 @@
 package org.apache.flink.languagebinding.api.java.common.streaming;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.Iterator;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
@@ -41,12 +42,13 @@ public abstract class Streamer implements Serializable {
 	private static final byte SIGNAL_LAST = 32;
 
 	private final byte[] buffer = new byte[4];
-	private DatagramPacket packet;
-	protected InetAddress host;
 
-	protected DatagramSocket socket;
-	protected int port1;
-	protected int port2;
+	protected ServerSocket server;
+	protected Socket socket;
+	protected InputStream in;
+	protected OutputStream out;
+	protected int port;
+
 	protected Sender sender;
 	protected Receiver receiver;
 
@@ -61,17 +63,8 @@ public abstract class Streamer implements Serializable {
 	}
 
 	public void open() throws IOException {
-		host = InetAddress.getByName("localhost");
-		packet = new DatagramPacket(buffer, 0, 4);
-		socket = new DatagramSocket(0, host);
-		socket.setSoTimeout(10000);
-		try {
-			setupProcess();
-			setupPorts();
-		} catch (SocketTimeoutException ste) {
-			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
-		}
-		socket.setSoTimeout(300000);
+		server = new ServerSocket(0);
+		setupProcess();
 	}
 
 	/**
@@ -92,30 +85,17 @@ public abstract class Streamer implements Serializable {
 		receiver.close();
 	}
 
-	/**
-	 * Setups the required UDP-ports.The streamer requires two UDP-ports to send control-signals to, one each for
-	 * reading/writing operations.
-	 *
-	 * @throws IOException
-	 */
-	private void setupPorts() throws IOException, SocketTimeoutException {
-		socket.receive(new DatagramPacket(buffer, 0, 4));
-		checkForError();
-		port1 = getInt(buffer, 0);
-		socket.receive(new DatagramPacket(buffer, 0, 4));
-		checkForError();
-		port2 = getInt(buffer, 0);
-	}
-
 	private void sendWriteNotification(int size, boolean hasNext) throws IOException {
 		byte[] tmp = new byte[5];
 		putInt(tmp, 0, size);
 		tmp[4] = hasNext ? 0 : SIGNAL_LAST;
-		socket.send(new DatagramPacket(tmp, 0, 5, host, port1));
+		out.write(tmp, 0, 5);
+		out.flush();
 	}
 
 	private void sendReadConfirmation() throws IOException {
-		socket.send(new DatagramPacket(new byte[1], 0, 1, host, port2));
+		out.write(new byte[1], 0, 1);
+		out.flush();
 	}
 
 	private void checkForError() {
@@ -145,7 +125,7 @@ public abstract class Streamer implements Serializable {
 				names[x] = config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null);
 			}
 
-			socket.receive(packet);
+			in.read(buffer, 0, 4);
 			checkForError();
 			int size = sender.sendRecord(broadcastCount);
 			sendWriteNotification(size, false);
@@ -153,13 +133,13 @@ public abstract class Streamer implements Serializable {
 			for (String name : names) {
 				Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator();
 
-				socket.receive(packet);
+				in.read(buffer, 0, 4);
 				checkForError();
 				size = sender.sendRecord(name);
 				sendWriteNotification(size, false);
 
 				while (bcv.hasNext() || sender.hasRemaining(0)) {
-					socket.receive(packet);
+					in.read(buffer, 0, 4);
 					checkForError();
 					size = sender.sendBuffer(bcv, 0);
 					sendWriteNotification(size, bcv.hasNext() || sender.hasRemaining(0));
@@ -183,13 +163,15 @@ public abstract class Streamer implements Serializable {
 			int size;
 			if (i.hasNext()) {
 				while (true) {
-					socket.receive(packet);
+					in.read(buffer, 0, 4);
 					int sig = getInt(buffer, 0);
 					switch (sig) {
 						case SIGNAL_BUFFER_REQUEST:
 							if (i.hasNext() || sender.hasRemaining(0)) {
 								size = sender.sendBuffer(i, 0);
 								sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext());
+							} else {
+								throw new RuntimeException("External process requested data even though none is available.");
 							}
 							break;
 						case SIGNAL_FINISHED:
@@ -226,7 +208,7 @@ public abstract class Streamer implements Serializable {
 			int size;
 			if (i1.hasNext() || i2.hasNext()) {
 				while (true) {
-					socket.receive(packet);
+					in.read(buffer, 0, 4);
 					int sig = getInt(buffer, 0);
 					switch (sig) {
 						case SIGNAL_BUFFER_REQUEST_G0:

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
index 835d95b..f636caf 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
+++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
@@ -13,7 +13,7 @@
 package org.apache.flink.languagebinding.api.java.python.streaming;
 
 import java.io.IOException;
-import java.net.DatagramPacket;
+import java.lang.reflect.Field;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import static org.apache.flink.languagebinding.api.java.common.PlanBinder.DEBUG;
 import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_EXECUTOR_NAME;
@@ -38,6 +38,7 @@ public class PythonStreamer extends Streamer {
 	private final int id;
 	private final boolean usePython3;
 	private final boolean debug;
+	private Thread shutdownThread;
 
 	private String inputFilePath;
 	private String outputFilePath;
@@ -90,54 +91,62 @@ public class PythonStreamer extends Streamer {
 			} catch (IOException ex) {
 				throw new RuntimeException(FLINK_PYTHON3_BINARY_KEY + "=" + FLINK_PYTHON3_BINARY_PATH + " does not point to a valid python binary.");
 			}
-			pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", executorPath, "" + socket.getLocalPort());
+			pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort());
 		} else {
 			try {
 				Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH);
 			} catch (IOException ex) {
 				throw new RuntimeException(FLINK_PYTHON2_BINARY_KEY + "=" + FLINK_PYTHON2_BINARY_PATH + " does not point to a valid python binary.");
 			}
-			pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", executorPath, "" + socket.getLocalPort());
+			pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort());
 		}
 		if (debug) {
 			socket.setSoTimeout(0);
 			LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName()
-					+ " Run python /tmp/flink" + FLINK_PYTHON_EXECUTOR_NAME + " " + socket.getLocalPort());
+					+ " Run python /tmp/flink" + FLINK_PYTHON_EXECUTOR_NAME + " " + server.getLocalPort());
 		} else {
 			process = pb.start();
 			new StreamPrinter(process.getInputStream()).start();
 			new StreamPrinter(process.getErrorStream(), true, msg).start();
 		}
-		byte[] executorPort = new byte[4];
-		socket.receive(new DatagramPacket(executorPort, 0, 4));
-		int exPort = getInt(executorPort, 0);
-		if (exPort == -2) {
-			try { //wait before terminating to ensure that the complete error message is printed
-				Thread.sleep(2000);
-			} catch (InterruptedException ex) {
+
+		shutdownThread = new Thread() {
+			@Override
+			public void run() {
+				try {
+					destroyProcess();
+				} catch (IOException ex) {
+				}
 			}
-			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
-		}
+		};
+
+		Runtime.getRuntime().addShutdownHook(shutdownThread);
+
+		socket = server.accept();
+		in = socket.getInputStream();
+		out = socket.getOutputStream();
 
 		byte[] opSize = new byte[4];
 		putInt(opSize, 0, operator.length);
-		socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
-		socket.send(new DatagramPacket(operator, 0, operator.length, host, exPort));
+		out.write(opSize, 0, 4);
+		out.write(operator, 0, operator.length);
 
 		byte[] meta = importString.toString().getBytes("utf-8");
 		putInt(opSize, 0, meta.length);
-		socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
-		socket.send(new DatagramPacket(meta, 0, meta.length, host, exPort));
+		out.write(opSize, 0, 4);
+		out.write(meta, 0, meta.length);
 
 		byte[] input = inputFilePath.getBytes("utf-8");
 		putInt(opSize, 0, input.length);
-		socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
-		socket.send(new DatagramPacket(input, 0, input.length, host, exPort));
+		out.write(opSize, 0, 4);
+		out.write(input, 0, input.length);
 
 		byte[] output = outputFilePath.getBytes("utf-8");
 		putInt(opSize, 0, output.length);
-		socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
-		socket.send(new DatagramPacket(output, 0, output.length, host, exPort));
+		out.write(opSize, 0, 4);
+		out.write(output, 0, output.length);
+
+		out.flush();
 
 		try { // wait a bit to catch syntax errors
 			Thread.sleep(2000);
@@ -165,9 +174,30 @@ public class PythonStreamer extends Streamer {
 			LOG.error("Exception occurred while closing Streamer. :" + e.getMessage());
 		}
 		if (!debug) {
-			try {
-				process.exitValue();
-			} catch (IllegalThreadStateException ise) { //process still active
+			destroyProcess();
+		}
+		if (shutdownThread != null) {
+			Runtime.getRuntime().removeShutdownHook(shutdownThread);
+		}
+	}
+
+	private void destroyProcess() throws IOException {
+		try {
+			process.exitValue();
+		} catch (IllegalThreadStateException ise) { //process still active
+			if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
+				int pid;
+				try {
+					Field f = process.getClass().getDeclaredField("pid");
+					f.setAccessible(true);
+					pid = f.getInt(process);
+				} catch (Throwable e) {
+					process.destroy();
+					return;
+				}
+				String[] args = new String[]{"kill", "-9", "" + pid};
+				Runtime.getRuntime().exec(args);
+			} else {
 				process.destroy();
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
index cebdc5d..79301a6 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
index b4cbc34..b03eda9 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
index 457ae1e..b89bc0e 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
index 76357a5..749a573 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
index 238527c..cddb9ca 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 # -*- coding: utf-8 -*-
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py
deleted file mode 100644
index 65b48d4..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py
+++ /dev/null
@@ -1,17 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
index a6f159e..bf0b557 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
index 2cad3be..25714ea 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
index b51c007..b55ca55 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 # #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
index 827efda..9dedb41 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
index 9d80c82..2cfb9d3 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
@@ -20,39 +20,35 @@ import socket
 import struct
 #argv[1] = port
 
-
+s = None
 try:
     import dill
     port = int(sys.argv[1])
 
-    s1 = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
-    s1.bind((socket.gethostbyname("localhost"), 0))
-    s1.sendto(struct.pack(">i", s1.getsockname()[1]), (socket.gethostbyname("localhost"), port))
+    s = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
+    s.connect((socket.gethostbyname("localhost"), port))
 
-    size = struct.unpack(">i", s1.recv(4))[0]
-    serialized_operator = s1.recv(size)
+    size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
+    serialized_operator = s.recv(size, socket.MSG_WAITALL)
 
-    size = struct.unpack(">i", s1.recv(4))[0]
-    import_string = s1.recv(size).decode("utf-8")
+    size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
+    import_string = s.recv(size, socket.MSG_WAITALL).decode("utf-8")
 
-    size = struct.unpack(">i", s1.recv(4))[0]
-    input_file = s1.recv(size).decode("utf-8")
+    size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
+    input_file = s.recv(size, socket.MSG_WAITALL).decode("utf-8")
 
-    size = struct.unpack(">i", s1.recv(4))[0]
-    output_file = s1.recv(size).decode("utf-8")
+    size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
+    output_file = s.recv(size, socket.MSG_WAITALL).decode("utf-8")
 
     exec(import_string)
     
     operator = dill.loads(serialized_operator)
-    operator._configure(input_file, output_file, port)
+    operator._configure(input_file, output_file, s)
     operator._go()
     sys.stdout.flush()
     sys.stderr.flush()
 except:
     sys.stdout.flush()
     sys.stderr.flush()
-    s = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
-    s.bind((socket.gethostbyname("localhost"), 0))
-    destination = (socket.gethostbyname("localhost"), int(sys.argv[1]))
-    s.sendto(struct.pack(">i", -2), destination)
+    s.send(struct.pack(">i", -2))
     raise
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
index 7d64352..cffe79e 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 import mmap
-import socket
+import socket as SOCKET
 import tempfile
 from struct import pack, unpack
 from collections import deque
@@ -61,18 +61,12 @@ class OneWayBusyBufferingMappedFileConnection(object):
 
 
 class BufferingUDPMappedFileConnection(object):
-    def __init__(self, input_file=tempfile.gettempdir() + "/flink_data/input", output_file=tempfile.gettempdir() + "/flink_data/output", port=25000):
+    def __init__(self, input_file=tempfile.gettempdir() + "/flink_data/input", output_file=tempfile.gettempdir() + "/flink_data/output", socket=None):
         self._input_file = open(input_file, "rb+")
         self._output_file = open(output_file, "rb+")
         self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ)
         self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
-        self._socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
-        self._socket.bind((socket.gethostbyname("localhost"), 0))
-        self._socket.settimeout(300)
-        self._destination = (socket.gethostbyname("localhost"), port)
-
-        self._socket.sendto(pack(">I", self._socket.getsockname()[1]), self._destination)
-        self._socket.sendto(pack(">I", self._socket.getsockname()[1]), self._destination)
+        self._socket = socket
 
         self._out = deque()
         self._out_size = 0
@@ -97,10 +91,10 @@ class BufferingUDPMappedFileConnection(object):
     def _write_buffer(self):
         self._file_output_buffer.seek(0, 0)
         self._file_output_buffer.write(b"".join(self._out))
-        self._socket.sendto(pack(">i", self._out_size), self._destination)
+        self._socket.send(pack(">i", self._out_size))
         self._out.clear()
         self._out_size = 0
-        self._socket.recvfrom(1)
+        self._socket.recv(1, SOCKET.MSG_WAITALL)
 
     def read(self, des_size, ignored=None):
         if self._input_size == self._input_offset:
@@ -110,10 +104,10 @@ class BufferingUDPMappedFileConnection(object):
         return self._input[old_offset:self._input_offset]
 
     def _read_buffer(self):
-        self._socket.sendto(SIGNAL_REQUEST_BUFFER, self._destination)
+        self._socket.send(SIGNAL_REQUEST_BUFFER)
         self._file_input_buffer.seek(0, 0)
         self._input_offset = 0
-        meta_size = self._socket.recvfrom(5)[0]
+        meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL)
         self._input_size = unpack(">I", meta_size[:4])[0]
         self._was_last = meta_size[4] == SIGNAL_WAS_LAST
         self._input = self._file_input_buffer.read(self._input_size)
@@ -121,7 +115,7 @@ class BufferingUDPMappedFileConnection(object):
     def send_end_signal(self):
         if self._out_size:
             self._write_buffer()
-        self._socket.sendto(SIGNAL_FINISHED, self._destination)
+        self._socket.send(SIGNAL_FINISHED)
 
     def has_next(self, ignored=None):
         return not self._was_last or not self._input_size == self._input_offset
@@ -134,8 +128,8 @@ class BufferingUDPMappedFileConnection(object):
 
 
 class TwinBufferingUDPMappedFileConnection(BufferingUDPMappedFileConnection):
-    def __init__(self, input_file=tempfile.gettempdir() + "/flink/data/input", output_file=tempfile.gettempdir() + "/flink/data/output", port=25000):
-        super(TwinBufferingUDPMappedFileConnection, self).__init__(input_file, output_file, port)
+    def __init__(self, input_file=tempfile.gettempdir() + "/flink/data/input", output_file=tempfile.gettempdir() + "/flink/data/output", socket=None):
+        super(TwinBufferingUDPMappedFileConnection, self).__init__(input_file, output_file, socket)
         self._input = ["", ""]
         self._input_offset = [0, 0]
         self._input_size = [0, 0]
@@ -150,12 +144,12 @@ class TwinBufferingUDPMappedFileConnection(BufferingUDPMappedFileConnection):
 
     def _read_buffer(self, group):
         if group:
-            self._socket.sendto(SIGNAL_REQUEST_BUFFER_G1, self._destination)
+            self._socket.send(SIGNAL_REQUEST_BUFFER_G1)
         else:
-            self._socket.sendto(SIGNAL_REQUEST_BUFFER_G0, self._destination)
+            self._socket.send(SIGNAL_REQUEST_BUFFER_G0)
         self._file_input_buffer.seek(0, 0)
         self._input_offset[group] = 0
-        meta_size = self._socket.recvfrom(5)[0]
+        meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL)
         self._input_size[group] = unpack(">I", meta_size[:4])[0]
         self._was_last[group] = meta_size[4] == SIGNAL_WAS_LAST
         self._input[group] = self._file_input_buffer.read(self._input_size[group])

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
index 7b1c5c5..fb0e26d 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
@@ -260,7 +260,7 @@ class ByteArrayDeserializer(object):
 
     def deserialize(self):
         size = unpack(">i", self.read(4, self._group))[0]
-        return bytearray(self.read(size, self._group))
+        return bytearray(self.read(size, self._group)) if size else bytearray(b"")
 
 
 class BooleanDeserializer(object):
@@ -315,7 +315,7 @@ class StringDeserializer(object):
 
     def deserialize(self):
         length = unpack(">i", self.read(4, self._group))[0]
-        return self.read(length, self._group).decode("utf-8")
+        return self.read(length, self._group).decode("utf-8") if length else ""
 
 
 class NullDeserializer(object):

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc
deleted file mode 100644
index 7f3cf94..0000000
Binary files a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
index 35359e2..0ce23ff 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
@@ -103,7 +103,7 @@ class GroupReduceFunction(Function.Function):
         keys.sort()
         for key in keys:
             values = grouping[key]
-            for op in self._sort_ops:
+            for op in reversed(self._sort_ops):
                 values.sort(key=lambda x:x[op[0]], reverse = op[1] == Order.DESCENDING)
             result = function(Iterator.ListIterator(values), collector)
             if result is not None:

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fef5a9e..4232fb7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -717,6 +717,8 @@ under the License.
 						<exclude>flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude>
 						<exclude>flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv</exclude>
 						<exclude>flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text</exclude>
+						<!-- Python -->						
+						<exclude>flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/**</exclude>
 						<!-- Configuration Files. -->
 						<exclude>**/flink-bin/conf/slaves</exclude>
                         <exclude>flink-contrib/docker-flink/flink/conf/slaves</exclude>