You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/03/18 15:13:08 UTC

flink git commit: [FLINK-6111] [py] Remove unnecessary sleeps

Repository: flink
Updated Branches:
  refs/heads/master da10a9b5f -> 3e767b5a4


[FLINK-6111] [py] Remove unnecessary sleeps


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e767b5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e767b5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e767b5a

Branch: refs/heads/master
Commit: 3e767b5a4a0846c8d53a8167d6236dbd9657eb2f
Parents: da10a9b
Author: zentol <ch...@apache.org>
Authored: Sat Mar 18 13:10:32 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Sat Mar 18 16:11:36 2017 +0100

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      | 43 ++++++++++++--------
 .../api/streaming/plan/PythonPlanStreamer.java  |  7 ----
 2 files changed, 25 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3e767b5a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 56ebf5b..3409960 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -75,6 +75,9 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable {
 
 	protected final AbstractRichFunction function;
 
+	protected transient Thread outPrinter;
+	protected transient Thread errorPrinter;
+
 	public PythonStreamer(AbstractRichFunction function, int id, boolean usesByteArray) {
 		this.id = id;
 		this.usePython3 = PythonPlanBinder.usePython3;
@@ -114,8 +117,10 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable {
 		}
 
 		process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments);
-		new StreamPrinter(process.getInputStream()).start();
-		new StreamPrinter(process.getErrorStream(), true, msg).start();
+		outPrinter = new StreamPrinter(process.getInputStream());
+		outPrinter.start();
+		errorPrinter = new StreamPrinter(process.getErrorStream(), true, msg);
+		errorPrinter.start();
 
 		shutdownThread = new Thread() {
 			@Override
@@ -139,16 +144,6 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable {
 		processOutput.write((outputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
 		processOutput.flush();
 
-		try { // wait a bit to catch syntax errors
-			Thread.sleep(2000);
-		} catch (InterruptedException ignored) {
-		}
-		try {
-			process.exitValue();
-			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
-		} catch (IllegalThreadStateException ignored) { //process still active -> start receiving data
-		}
-
 		while (true) {
 			try {
 				socket = server.accept();
@@ -285,9 +280,15 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable {
 						case SIGNAL_FINISHED:
 							return;
 						case SIGNAL_ERROR:
-							try { //wait before terminating to ensure that the complete error message is printed
-								Thread.sleep(2000);
-							} catch (InterruptedException ignored) {
+							try {
+								outPrinter.join(1000);
+							} catch (InterruptedException e) {
+								outPrinter.interrupt();
+							}
+							try {
+								errorPrinter.join(1000);
+							} catch (InterruptedException e) {
+								errorPrinter.interrupt();
 							}
 							throw new RuntimeException(
 									"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
@@ -333,9 +334,15 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable {
 						case SIGNAL_FINISHED:
 							return;
 						case SIGNAL_ERROR:
-							try { //wait before terminating to ensure that the complete error message is printed
-								Thread.sleep(2000);
-							} catch (InterruptedException ignored) {
+							try {
+								outPrinter.join(1000);
+							} catch (InterruptedException e) {
+								outPrinter.interrupt();
+							}
+							try {
+								errorPrinter.join(1000);
+							} catch (InterruptedException e) {
+								errorPrinter.interrupt();
 							}
 							throw new RuntimeException(
 									"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);

http://git-wip-us.apache.org/repos/asf/flink/blob/3e767b5a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index c27776b..d97cf69 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -82,13 +82,6 @@ public class PythonPlanStreamer {
 		new StreamPrinter(process.getInputStream()).start();
 		new StreamPrinter(process.getErrorStream()).start();
 
-		try {
-			Thread.sleep(2000);
-		} catch (InterruptedException ignored) {
-		}
-
-		checkPythonProcessHealth();
-
 		process.getOutputStream().write("plan\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
 		process.getOutputStream().write((server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
 		process.getOutputStream().flush();