You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/03/17 07:02:53 UTC
flink git commit: [FLINK-5650] [py] Continuously check PyProcess
health while waiting for incoming connection
Repository: flink
Updated Branches:
refs/heads/release-1.2 9d59e008d -> edd1065c7
[FLINK-5650] [py] Continuously check PyProcess health while waiting for incoming connection
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edd1065c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edd1065c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edd1065c
Branch: refs/heads/release-1.2
Commit: edd1065c73f3600929b1f731bc0b66255a56d26d
Parents: 9d59e00
Author: zentol <ch...@apache.org>
Authored: Wed Mar 15 17:06:12 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Mar 16 17:09:16 2017 +0100
----------------------------------------------------------------------
.../api/streaming/data/PythonStreamer.java | 23 ++++++++++++-
.../api/streaming/plan/PythonPlanStreamer.java | 35 ++++++++++++++------
2 files changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/edd1065c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 10aded8..bab4904 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -89,6 +89,7 @@ public class PythonStreamer implements Serializable {
*/
public void open() throws IOException {
server = new ServerSocket(0);
+ server.setSoTimeout(50);
startPython();
}
@@ -145,11 +146,31 @@ public class PythonStreamer implements Serializable {
} catch (IllegalThreadStateException ise) { //process still active -> start receiving data
}
- socket = server.accept();
+ while (true) {
+ try {
+ socket = server.accept();
+ break;
+ } catch (SocketTimeoutException ignored) {
+ checkPythonProcessHealth();
+ }
+ }
in = new DataInputStream(socket.getInputStream());
out = new DataOutputStream(socket.getOutputStream());
}
+ private void checkPythonProcessHealth() {
+ try {
+ int value = process.exitValue();
+ if (value != 0) {
+ throw new RuntimeException("Plan file caused an error. Check log-files for details.");
+ }
+ if (value == 0) {
+ throw new RuntimeException("Plan file exited prematurely without an error.");
+ }
+ } catch (IllegalThreadStateException ise) {//Process still running
+ }
+ }
+
/**
* Closes this streamer.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/edd1065c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index ecbc7f4..eafbdc1 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -16,6 +16,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketTimeoutException;
import org.apache.flink.python.api.streaming.util.StreamPrinter;
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
@@ -47,8 +48,16 @@ public class PythonPlanStreamer implements Serializable {
public void open(String tmpPath, String args) throws IOException {
server = new ServerSocket(0);
+ server.setSoTimeout(50);
startPython(tmpPath, args);
- socket = server.accept();
+ while (true) {
+ try {
+ socket = server.accept();
+ break;
+ } catch (SocketTimeoutException ignored) {
+ checkPythonProcessHealth();
+ }
+ }
sender = new PythonPlanSender(socket.getOutputStream());
receiver = new PythonPlanReceiver(socket.getInputStream());
}
@@ -71,16 +80,7 @@ public class PythonPlanStreamer implements Serializable {
} catch (InterruptedException ex) {
}
- try {
- int value = process.exitValue();
- if (value != 0) {
- throw new RuntimeException("Plan file caused an error. Check log-files for details.");
- }
- if (value == 0) {
- throw new RuntimeException("Plan file exited prematurely without an error.");
- }
- } catch (IllegalThreadStateException ise) {//Process still running
- }
+ checkPythonProcessHealth();
process.getOutputStream().write("plan\n".getBytes());
process.getOutputStream().write((server.getLocalPort() + "\n").getBytes());
@@ -95,4 +95,17 @@ public class PythonPlanStreamer implements Serializable {
process.destroy();
}
}
+
+ private void checkPythonProcessHealth() {
+ try {
+ int value = process.exitValue();
+ if (value != 0) {
+ throw new RuntimeException("Plan file caused an error. Check log-files for details.");
+ }
+ if (value == 0) {
+ throw new RuntimeException("Plan file exited prematurely without an error.");
+ }
+ } catch (IllegalThreadStateException ise) {//Process still running
+ }
+ }
}