You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/04/21 15:47:46 UTC
[10/10] flink git commit: [Flink-671] Python API additions
[Flink-671] Python API additions
Fixes several minor issues
Requesting non-available data throws Exception
Python process shutodwn more reliable
Synchronization done via TCP
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1618e28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1618e28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1618e28
Branch: refs/heads/master
Commit: e1618e287885390c9ea1d4954ea18eeed5783b4a
Parents: 1d669da
Author: zentol <s....@web.de>
Authored: Wed Mar 4 17:20:00 2015 +0100
Committer: zentol <s....@web.de>
Committed: Tue Apr 21 14:02:54 2015 +0200
----------------------------------------------------------------------
.../api/java/common/streaming/Streamer.java | 64 ++++++---------
.../java/python/streaming/PythonStreamer.java | 78 +++++++++++++------
.../languagebinding/api/python/dill/__diff.py | 18 -----
.../languagebinding/api/python/dill/__init__.py | 18 -----
.../languagebinding/api/python/dill/_objects.py | 18 -----
.../languagebinding/api/python/dill/detect.py | 18 -----
.../languagebinding/api/python/dill/dill.py | 18 -----
.../languagebinding/api/python/dill/info.py | 17 ----
.../languagebinding/api/python/dill/objtypes.py | 18 -----
.../languagebinding/api/python/dill/pointers.py | 18 -----
.../languagebinding/api/python/dill/source.py | 18 -----
.../languagebinding/api/python/dill/temp.py | 18 -----
.../languagebinding/api/python/executor.py | 30 ++++---
.../api/python/flink/connection/Connection.py | 32 ++++----
.../api/python/flink/connection/Iterator.py | 4 +-
.../api/python/flink/connection/__init__.pyc | Bin 243 -> 0 bytes
.../flink/functions/GroupReduceFunction.py | 2 +-
pom.xml | 2 +
18 files changed, 108 insertions(+), 283 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
index 1a96e98..8b19425 100644
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
@@ -13,10 +13,11 @@
package org.apache.flink.languagebinding.api.java.common.streaming;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import org.apache.flink.api.common.functions.AbstractRichFunction;
@@ -41,12 +42,13 @@ public abstract class Streamer implements Serializable {
private static final byte SIGNAL_LAST = 32;
private final byte[] buffer = new byte[4];
- private DatagramPacket packet;
- protected InetAddress host;
- protected DatagramSocket socket;
- protected int port1;
- protected int port2;
+ protected ServerSocket server;
+ protected Socket socket;
+ protected InputStream in;
+ protected OutputStream out;
+ protected int port;
+
protected Sender sender;
protected Receiver receiver;
@@ -61,17 +63,8 @@ public abstract class Streamer implements Serializable {
}
public void open() throws IOException {
- host = InetAddress.getByName("localhost");
- packet = new DatagramPacket(buffer, 0, 4);
- socket = new DatagramSocket(0, host);
- socket.setSoTimeout(10000);
- try {
- setupProcess();
- setupPorts();
- } catch (SocketTimeoutException ste) {
- throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
- }
- socket.setSoTimeout(300000);
+ server = new ServerSocket(0);
+ setupProcess();
}
/**
@@ -92,30 +85,17 @@ public abstract class Streamer implements Serializable {
receiver.close();
}
- /**
- * Setups the required UDP-ports.The streamer requires two UDP-ports to send control-signals to, one each for
- * reading/writing operations.
- *
- * @throws IOException
- */
- private void setupPorts() throws IOException, SocketTimeoutException {
- socket.receive(new DatagramPacket(buffer, 0, 4));
- checkForError();
- port1 = getInt(buffer, 0);
- socket.receive(new DatagramPacket(buffer, 0, 4));
- checkForError();
- port2 = getInt(buffer, 0);
- }
-
private void sendWriteNotification(int size, boolean hasNext) throws IOException {
byte[] tmp = new byte[5];
putInt(tmp, 0, size);
tmp[4] = hasNext ? 0 : SIGNAL_LAST;
- socket.send(new DatagramPacket(tmp, 0, 5, host, port1));
+ out.write(tmp, 0, 5);
+ out.flush();
}
private void sendReadConfirmation() throws IOException {
- socket.send(new DatagramPacket(new byte[1], 0, 1, host, port2));
+ out.write(new byte[1], 0, 1);
+ out.flush();
}
private void checkForError() {
@@ -145,7 +125,7 @@ public abstract class Streamer implements Serializable {
names[x] = config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null);
}
- socket.receive(packet);
+ in.read(buffer, 0, 4);
checkForError();
int size = sender.sendRecord(broadcastCount);
sendWriteNotification(size, false);
@@ -153,13 +133,13 @@ public abstract class Streamer implements Serializable {
for (String name : names) {
Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator();
- socket.receive(packet);
+ in.read(buffer, 0, 4);
checkForError();
size = sender.sendRecord(name);
sendWriteNotification(size, false);
while (bcv.hasNext() || sender.hasRemaining(0)) {
- socket.receive(packet);
+ in.read(buffer, 0, 4);
checkForError();
size = sender.sendBuffer(bcv, 0);
sendWriteNotification(size, bcv.hasNext() || sender.hasRemaining(0));
@@ -183,13 +163,15 @@ public abstract class Streamer implements Serializable {
int size;
if (i.hasNext()) {
while (true) {
- socket.receive(packet);
+ in.read(buffer, 0, 4);
int sig = getInt(buffer, 0);
switch (sig) {
case SIGNAL_BUFFER_REQUEST:
if (i.hasNext() || sender.hasRemaining(0)) {
size = sender.sendBuffer(i, 0);
sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext());
+ } else {
+ throw new RuntimeException("External process requested data even though none is available.");
}
break;
case SIGNAL_FINISHED:
@@ -226,7 +208,7 @@ public abstract class Streamer implements Serializable {
int size;
if (i1.hasNext() || i2.hasNext()) {
while (true) {
- socket.receive(packet);
+ in.read(buffer, 0, 4);
int sig = getInt(buffer, 0);
switch (sig) {
case SIGNAL_BUFFER_REQUEST_G0:
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
index 835d95b..f636caf 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
+++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
@@ -13,7 +13,7 @@
package org.apache.flink.languagebinding.api.java.python.streaming;
import java.io.IOException;
-import java.net.DatagramPacket;
+import java.lang.reflect.Field;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import static org.apache.flink.languagebinding.api.java.common.PlanBinder.DEBUG;
import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_EXECUTOR_NAME;
@@ -38,6 +38,7 @@ public class PythonStreamer extends Streamer {
private final int id;
private final boolean usePython3;
private final boolean debug;
+ private Thread shutdownThread;
private String inputFilePath;
private String outputFilePath;
@@ -90,54 +91,62 @@ public class PythonStreamer extends Streamer {
} catch (IOException ex) {
throw new RuntimeException(FLINK_PYTHON3_BINARY_KEY + "=" + FLINK_PYTHON3_BINARY_PATH + " does not point to a valid python binary.");
}
- pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", executorPath, "" + socket.getLocalPort());
+ pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort());
} else {
try {
Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH);
} catch (IOException ex) {
throw new RuntimeException(FLINK_PYTHON2_BINARY_KEY + "=" + FLINK_PYTHON2_BINARY_PATH + " does not point to a valid python binary.");
}
- pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", executorPath, "" + socket.getLocalPort());
+ pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", executorPath, "" + server.getLocalPort());
}
if (debug) {
socket.setSoTimeout(0);
LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName()
- + " Run python /tmp/flink" + FLINK_PYTHON_EXECUTOR_NAME + " " + socket.getLocalPort());
+ + " Run python /tmp/flink" + FLINK_PYTHON_EXECUTOR_NAME + " " + server.getLocalPort());
} else {
process = pb.start();
new StreamPrinter(process.getInputStream()).start();
new StreamPrinter(process.getErrorStream(), true, msg).start();
}
- byte[] executorPort = new byte[4];
- socket.receive(new DatagramPacket(executorPort, 0, 4));
- int exPort = getInt(executorPort, 0);
- if (exPort == -2) {
- try { //wait before terminating to ensure that the complete error message is printed
- Thread.sleep(2000);
- } catch (InterruptedException ex) {
+
+ shutdownThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ destroyProcess();
+ } catch (IOException ex) {
+ }
}
- throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
- }
+ };
+
+ Runtime.getRuntime().addShutdownHook(shutdownThread);
+
+ socket = server.accept();
+ in = socket.getInputStream();
+ out = socket.getOutputStream();
byte[] opSize = new byte[4];
putInt(opSize, 0, operator.length);
- socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
- socket.send(new DatagramPacket(operator, 0, operator.length, host, exPort));
+ out.write(opSize, 0, 4);
+ out.write(operator, 0, operator.length);
byte[] meta = importString.toString().getBytes("utf-8");
putInt(opSize, 0, meta.length);
- socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
- socket.send(new DatagramPacket(meta, 0, meta.length, host, exPort));
+ out.write(opSize, 0, 4);
+ out.write(meta, 0, meta.length);
byte[] input = inputFilePath.getBytes("utf-8");
putInt(opSize, 0, input.length);
- socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
- socket.send(new DatagramPacket(input, 0, input.length, host, exPort));
+ out.write(opSize, 0, 4);
+ out.write(input, 0, input.length);
byte[] output = outputFilePath.getBytes("utf-8");
putInt(opSize, 0, output.length);
- socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
- socket.send(new DatagramPacket(output, 0, output.length, host, exPort));
+ out.write(opSize, 0, 4);
+ out.write(output, 0, output.length);
+
+ out.flush();
try { // wait a bit to catch syntax errors
Thread.sleep(2000);
@@ -165,9 +174,30 @@ public class PythonStreamer extends Streamer {
LOG.error("Exception occurred while closing Streamer. :" + e.getMessage());
}
if (!debug) {
- try {
- process.exitValue();
- } catch (IllegalThreadStateException ise) { //process still active
+ destroyProcess();
+ }
+ if (shutdownThread != null) {
+ Runtime.getRuntime().removeShutdownHook(shutdownThread);
+ }
+ }
+
+ private void destroyProcess() throws IOException {
+ try {
+ process.exitValue();
+ } catch (IllegalThreadStateException ise) { //process still active
+ if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
+ int pid;
+ try {
+ Field f = process.getClass().getDeclaredField("pid");
+ f.setAccessible(true);
+ pid = f.getInt(process);
+ } catch (Throwable e) {
+ process.destroy();
+ return;
+ }
+ String[] args = new String[]{"kill", "-9", "" + pid};
+ Runtime.getRuntime().exec(args);
+ } else {
process.destroy();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
index cebdc5d..79301a6 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
@@ -1,21 +1,3 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
index b4cbc34..b03eda9 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
@@ -1,21 +1,3 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
index 457ae1e..b89bc0e 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
@@ -1,21 +1,3 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
index 76357a5..749a573 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
@@ -1,21 +1,3 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
index 238527c..cddb9ca 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
@@ -1,21 +1,3 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
# -*- coding: utf-8 -*-
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py
deleted file mode 100644
index 65b48d4..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py
+++ /dev/null
@@ -1,17 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
index a6f159e..bf0b557 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
@@ -1,21 +1,3 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
index 2cad3be..25714ea 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
@@ -1,21 +1,3 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
index b51c007..b55ca55 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
@@ -1,21 +1,3 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
# #!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
index 827efda..9dedb41 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
@@ -1,21 +1,3 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
index 9d80c82..2cfb9d3 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
@@ -20,39 +20,35 @@ import socket
import struct
#argv[1] = port
-
+s = None
try:
import dill
port = int(sys.argv[1])
- s1 = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
- s1.bind((socket.gethostbyname("localhost"), 0))
- s1.sendto(struct.pack(">i", s1.getsockname()[1]), (socket.gethostbyname("localhost"), port))
+ s = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
+ s.connect((socket.gethostbyname("localhost"), port))
- size = struct.unpack(">i", s1.recv(4))[0]
- serialized_operator = s1.recv(size)
+ size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
+ serialized_operator = s.recv(size, socket.MSG_WAITALL)
- size = struct.unpack(">i", s1.recv(4))[0]
- import_string = s1.recv(size).decode("utf-8")
+ size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
+ import_string = s.recv(size, socket.MSG_WAITALL).decode("utf-8")
- size = struct.unpack(">i", s1.recv(4))[0]
- input_file = s1.recv(size).decode("utf-8")
+ size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
+ input_file = s.recv(size, socket.MSG_WAITALL).decode("utf-8")
- size = struct.unpack(">i", s1.recv(4))[0]
- output_file = s1.recv(size).decode("utf-8")
+ size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
+ output_file = s.recv(size, socket.MSG_WAITALL).decode("utf-8")
exec(import_string)
operator = dill.loads(serialized_operator)
- operator._configure(input_file, output_file, port)
+ operator._configure(input_file, output_file, s)
operator._go()
sys.stdout.flush()
sys.stderr.flush()
except:
sys.stdout.flush()
sys.stderr.flush()
- s = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
- s.bind((socket.gethostbyname("localhost"), 0))
- destination = (socket.gethostbyname("localhost"), int(sys.argv[1]))
- s.sendto(struct.pack(">i", -2), destination)
+ s.send(struct.pack(">i", -2))
raise
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
index 7d64352..cffe79e 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
import mmap
-import socket
+import socket as SOCKET
import tempfile
from struct import pack, unpack
from collections import deque
@@ -61,18 +61,12 @@ class OneWayBusyBufferingMappedFileConnection(object):
class BufferingUDPMappedFileConnection(object):
- def __init__(self, input_file=tempfile.gettempdir() + "/flink_data/input", output_file=tempfile.gettempdir() + "/flink_data/output", port=25000):
+ def __init__(self, input_file=tempfile.gettempdir() + "/flink_data/input", output_file=tempfile.gettempdir() + "/flink_data/output", socket=None):
self._input_file = open(input_file, "rb+")
self._output_file = open(output_file, "rb+")
self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ)
self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
- self._socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
- self._socket.bind((socket.gethostbyname("localhost"), 0))
- self._socket.settimeout(300)
- self._destination = (socket.gethostbyname("localhost"), port)
-
- self._socket.sendto(pack(">I", self._socket.getsockname()[1]), self._destination)
- self._socket.sendto(pack(">I", self._socket.getsockname()[1]), self._destination)
+ self._socket = socket
self._out = deque()
self._out_size = 0
@@ -97,10 +91,10 @@ class BufferingUDPMappedFileConnection(object):
def _write_buffer(self):
self._file_output_buffer.seek(0, 0)
self._file_output_buffer.write(b"".join(self._out))
- self._socket.sendto(pack(">i", self._out_size), self._destination)
+ self._socket.send(pack(">i", self._out_size))
self._out.clear()
self._out_size = 0
- self._socket.recvfrom(1)
+ self._socket.recv(1, SOCKET.MSG_WAITALL)
def read(self, des_size, ignored=None):
if self._input_size == self._input_offset:
@@ -110,10 +104,10 @@ class BufferingUDPMappedFileConnection(object):
return self._input[old_offset:self._input_offset]
def _read_buffer(self):
- self._socket.sendto(SIGNAL_REQUEST_BUFFER, self._destination)
+ self._socket.send(SIGNAL_REQUEST_BUFFER)
self._file_input_buffer.seek(0, 0)
self._input_offset = 0
- meta_size = self._socket.recvfrom(5)[0]
+ meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL)
self._input_size = unpack(">I", meta_size[:4])[0]
self._was_last = meta_size[4] == SIGNAL_WAS_LAST
self._input = self._file_input_buffer.read(self._input_size)
@@ -121,7 +115,7 @@ class BufferingUDPMappedFileConnection(object):
def send_end_signal(self):
if self._out_size:
self._write_buffer()
- self._socket.sendto(SIGNAL_FINISHED, self._destination)
+ self._socket.send(SIGNAL_FINISHED)
def has_next(self, ignored=None):
return not self._was_last or not self._input_size == self._input_offset
@@ -134,8 +128,8 @@ class BufferingUDPMappedFileConnection(object):
class TwinBufferingUDPMappedFileConnection(BufferingUDPMappedFileConnection):
- def __init__(self, input_file=tempfile.gettempdir() + "/flink/data/input", output_file=tempfile.gettempdir() + "/flink/data/output", port=25000):
- super(TwinBufferingUDPMappedFileConnection, self).__init__(input_file, output_file, port)
+ def __init__(self, input_file=tempfile.gettempdir() + "/flink/data/input", output_file=tempfile.gettempdir() + "/flink/data/output", socket=None):
+ super(TwinBufferingUDPMappedFileConnection, self).__init__(input_file, output_file, socket)
self._input = ["", ""]
self._input_offset = [0, 0]
self._input_size = [0, 0]
@@ -150,12 +144,12 @@ class TwinBufferingUDPMappedFileConnection(BufferingUDPMappedFileConnection):
def _read_buffer(self, group):
if group:
- self._socket.sendto(SIGNAL_REQUEST_BUFFER_G1, self._destination)
+ self._socket.send(SIGNAL_REQUEST_BUFFER_G1)
else:
- self._socket.sendto(SIGNAL_REQUEST_BUFFER_G0, self._destination)
+ self._socket.send(SIGNAL_REQUEST_BUFFER_G0)
self._file_input_buffer.seek(0, 0)
self._input_offset[group] = 0
- meta_size = self._socket.recvfrom(5)[0]
+ meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL)
self._input_size[group] = unpack(">I", meta_size[:4])[0]
self._was_last[group] = meta_size[4] == SIGNAL_WAS_LAST
self._input[group] = self._file_input_buffer.read(self._input_size[group])
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
index 7b1c5c5..fb0e26d 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
@@ -260,7 +260,7 @@ class ByteArrayDeserializer(object):
def deserialize(self):
size = unpack(">i", self.read(4, self._group))[0]
- return bytearray(self.read(size, self._group))
+ return bytearray(self.read(size, self._group)) if size else bytearray(b"")
class BooleanDeserializer(object):
@@ -315,7 +315,7 @@ class StringDeserializer(object):
def deserialize(self):
length = unpack(">i", self.read(4, self._group))[0]
- return self.read(length, self._group).decode("utf-8")
+ return self.read(length, self._group).decode("utf-8") if length else ""
class NullDeserializer(object):
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc
deleted file mode 100644
index 7f3cf94..0000000
Binary files a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
index 35359e2..0ce23ff 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
@@ -103,7 +103,7 @@ class GroupReduceFunction(Function.Function):
keys.sort()
for key in keys:
values = grouping[key]
- for op in self._sort_ops:
+ for op in reversed(self._sort_ops):
values.sort(key=lambda x:x[op[0]], reverse = op[1] == Order.DESCENDING)
result = function(Iterator.ListIterator(values), collector)
if result is not None:
http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fef5a9e..4232fb7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -717,6 +717,8 @@ under the License.
<exclude>flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude>
<exclude>flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv</exclude>
<exclude>flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text</exclude>
+ <!-- Python -->
+ <exclude>flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/**</exclude>
<!-- Configuration Files. -->
<exclude>**/flink-bin/conf/slaves</exclude>
<exclude>flink-contrib/docker-flink/flink/conf/slaves</exclude>