You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/03/18 15:13:08 UTC
flink git commit: [FLINK-6111] [py] Remove unnecessary sleeps
Repository: flink
Updated Branches:
refs/heads/master da10a9b5f -> 3e767b5a4
[FLINK-6111] [py] Remove unnecessary sleeps
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e767b5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e767b5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e767b5a
Branch: refs/heads/master
Commit: 3e767b5a4a0846c8d53a8167d6236dbd9657eb2f
Parents: da10a9b
Author: zentol <ch...@apache.org>
Authored: Sat Mar 18 13:10:32 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Sat Mar 18 16:11:36 2017 +0100
----------------------------------------------------------------------
.../api/streaming/data/PythonStreamer.java | 43 ++++++++++++--------
.../api/streaming/plan/PythonPlanStreamer.java | 7 ----
2 files changed, 25 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3e767b5a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 56ebf5b..3409960 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -75,6 +75,9 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable {
protected final AbstractRichFunction function;
+ protected transient Thread outPrinter;
+ protected transient Thread errorPrinter;
+
public PythonStreamer(AbstractRichFunction function, int id, boolean usesByteArray) {
this.id = id;
this.usePython3 = PythonPlanBinder.usePython3;
@@ -114,8 +117,10 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable {
}
process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments);
- new StreamPrinter(process.getInputStream()).start();
- new StreamPrinter(process.getErrorStream(), true, msg).start();
+ outPrinter = new StreamPrinter(process.getInputStream());
+ outPrinter.start();
+ errorPrinter = new StreamPrinter(process.getErrorStream(), true, msg);
+ errorPrinter.start();
shutdownThread = new Thread() {
@Override
@@ -139,16 +144,6 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable {
processOutput.write((outputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
processOutput.flush();
- try { // wait a bit to catch syntax errors
- Thread.sleep(2000);
- } catch (InterruptedException ignored) {
- }
- try {
- process.exitValue();
- throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
- } catch (IllegalThreadStateException ignored) { //process still active -> start receiving data
- }
-
while (true) {
try {
socket = server.accept();
@@ -285,9 +280,15 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable {
case SIGNAL_FINISHED:
return;
case SIGNAL_ERROR:
- try { //wait before terminating to ensure that the complete error message is printed
- Thread.sleep(2000);
- } catch (InterruptedException ignored) {
+ try {
+ outPrinter.join(1000);
+ } catch (InterruptedException e) {
+ outPrinter.interrupt();
+ }
+ try {
+ errorPrinter.join(1000);
+ } catch (InterruptedException e) {
+ errorPrinter.interrupt();
}
throw new RuntimeException(
"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
@@ -333,9 +334,15 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable {
case SIGNAL_FINISHED:
return;
case SIGNAL_ERROR:
- try { //wait before terminating to ensure that the complete error message is printed
- Thread.sleep(2000);
- } catch (InterruptedException ignored) {
+ try {
+ outPrinter.join(1000);
+ } catch (InterruptedException e) {
+ outPrinter.interrupt();
+ }
+ try {
+ errorPrinter.join(1000);
+ } catch (InterruptedException e) {
+ errorPrinter.interrupt();
}
throw new RuntimeException(
"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
http://git-wip-us.apache.org/repos/asf/flink/blob/3e767b5a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index c27776b..d97cf69 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -82,13 +82,6 @@ public class PythonPlanStreamer {
new StreamPrinter(process.getInputStream()).start();
new StreamPrinter(process.getErrorStream()).start();
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ignored) {
- }
-
- checkPythonProcessHealth();
-
process.getOutputStream().write("plan\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
process.getOutputStream().write((server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
process.getOutputStream().flush();