You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/03/17 07:02:53 UTC

flink git commit: [FLINK-5650] [py] Continuously check PyProcess health while waiting for incoming connection

Repository: flink
Updated Branches:
  refs/heads/release-1.2 9d59e008d -> edd1065c7


[FLINK-5650] [py] Continuously check PyProcess health while waiting for incoming connection


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edd1065c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edd1065c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edd1065c

Branch: refs/heads/release-1.2
Commit: edd1065c73f3600929b1f731bc0b66255a56d26d
Parents: 9d59e00
Author: zentol <ch...@apache.org>
Authored: Wed Mar 15 17:06:12 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Mar 16 17:09:16 2017 +0100

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      | 23 ++++++++++++-
 .../api/streaming/plan/PythonPlanStreamer.java  | 35 ++++++++++++++------
 2 files changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/edd1065c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 10aded8..bab4904 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -89,6 +89,7 @@ public class PythonStreamer implements Serializable {
 	 */
 	public void open() throws IOException {
 		server = new ServerSocket(0);
+		server.setSoTimeout(50);
 		startPython();
 	}
 
@@ -145,11 +146,31 @@ public class PythonStreamer implements Serializable {
 		} catch (IllegalThreadStateException ise) { //process still active -> start receiving data
 		}
 
-		socket = server.accept();
+		while (true) {
+			try {
+				socket = server.accept();
+				break;
+			} catch (SocketTimeoutException ignored) {
+				checkPythonProcessHealth();
+			}
+		}
 		in = new DataInputStream(socket.getInputStream());
 		out = new DataOutputStream(socket.getOutputStream());
 	}
 
+	private void checkPythonProcessHealth() {
+		try {
+			int value = process.exitValue();
+			if (value != 0) {
+				throw new RuntimeException("Plan file caused an error. Check log-files for details.");
+			}
+			if (value == 0) {
+				throw new RuntimeException("Plan file exited prematurely without an error.");
+			}
+		} catch (IllegalThreadStateException ise) {//Process still running
+		}
+	}
+
 	/**
 	 * Closes this streamer.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/edd1065c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index ecbc7f4..eafbdc1 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -16,6 +16,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
@@ -47,8 +48,16 @@ public class PythonPlanStreamer implements Serializable {
 
 	public void open(String tmpPath, String args) throws IOException {
 		server = new ServerSocket(0);
+		server.setSoTimeout(50);
 		startPython(tmpPath, args);
-		socket = server.accept();
+		while (true) {
+			try {
+				socket = server.accept();
+				break;
+			} catch (SocketTimeoutException ignored) {
+				checkPythonProcessHealth();
+			}
+		}
 		sender = new PythonPlanSender(socket.getOutputStream());
 		receiver = new PythonPlanReceiver(socket.getInputStream());
 	}
@@ -71,16 +80,7 @@ public class PythonPlanStreamer implements Serializable {
 		} catch (InterruptedException ex) {
 		}
 
-		try {
-			int value = process.exitValue();
-			if (value != 0) {
-				throw new RuntimeException("Plan file caused an error. Check log-files for details.");
-			}
-			if (value == 0) {
-				throw new RuntimeException("Plan file exited prematurely without an error.");
-			}
-		} catch (IllegalThreadStateException ise) {//Process still running
-		}
+		checkPythonProcessHealth();
 
 		process.getOutputStream().write("plan\n".getBytes());
 		process.getOutputStream().write((server.getLocalPort() + "\n").getBytes());
@@ -95,4 +95,17 @@ public class PythonPlanStreamer implements Serializable {
 			process.destroy();
 		}
 	}
+
+	private void checkPythonProcessHealth() {
+		try {
+			int value = process.exitValue();
+			if (value != 0) {
+				throw new RuntimeException("Plan file caused an error. Check log-files for details.");
+			}
+			if (value == 0) {
+				throw new RuntimeException("Plan file exited prematurely without an error.");
+			}
+		} catch (IllegalThreadStateException ise) {//Process still running
+		}
+	}
 }