You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/03/16 16:05:46 UTC
[1/2] flink git commit: [refactor] [py] General code clean-up
Repository: flink
Updated Branches:
refs/heads/master 03889ae1f -> 30bb958a7
[refactor] [py] General code clean-up
- added missing serialVersionUID
- PythonPlan* classes no longer implement serializable
- marked several fields as transient
- replaced some fields by local variables
- input/output files are now being deleted
- remvoed some unnecessary casts
- renamved several ignored exceptions
- close socket in PythonPlanStreamer
- simplified PyProcess health check
- eliminated several uses of raw types
- removed some unthrown exception declarations
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30bb958a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30bb958a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30bb958a
Branch: refs/heads/master
Commit: 30bb958a73e74ced94548897962740a59677dc12
Parents: c2bb5de
Author: zentol <ch...@apache.org>
Authored: Thu Mar 16 13:24:21 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Mar 16 17:03:38 2017 +0100
----------------------------------------------------------------------
.../python/api/functions/PythonCoGroup.java | 11 ++--
.../api/functions/PythonMapPartition.java | 11 ++--
.../api/streaming/data/PythonReceiver.java | 22 +++----
.../python/api/streaming/data/PythonSender.java | 47 ++++++++-------
.../api/streaming/data/PythonStreamer.java | 60 ++++++++++----------
.../api/streaming/plan/PythonPlanReceiver.java | 12 ++--
.../api/streaming/plan/PythonPlanSender.java | 4 +-
.../api/streaming/plan/PythonPlanStreamer.java | 27 ++++++---
8 files changed, 106 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
index 33d88c3..9da5a4c 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
@@ -27,13 +27,16 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
* @param <IN2>
* @param <OUT>
*/
-public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, IN2, OUT> implements ResultTypeQueryable {
- private final PythonStreamer streamer;
- private transient final TypeInformation<OUT> typeInformation;
+public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, IN2, OUT> implements ResultTypeQueryable<OUT> {
+
+ private static final long serialVersionUID = -3997396583317513873L;
+
+ private final PythonStreamer<IN1, IN2, OUT> streamer;
+ private final transient TypeInformation<OUT> typeInformation;
public PythonCoGroup(int id, TypeInformation<OUT> typeInformation) {
this.typeInformation = typeInformation;
- streamer = new PythonStreamer(this, id, true);
+ streamer = new PythonStreamer<>(this, id, true);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
index 6282210..c596d6c 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
@@ -28,13 +28,16 @@ import org.apache.flink.util.Collector;
* @param <IN>
* @param <OUT>
*/
-public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, OUT> implements ResultTypeQueryable {
- private final PythonStreamer streamer;
- private transient final TypeInformation<OUT> typeInformation;
+public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, OUT> implements ResultTypeQueryable<OUT>{
+
+ private static final long serialVersionUID = 3866306483023916413L;
+
+ private final PythonStreamer<IN, IN, OUT> streamer;
+ private final transient TypeInformation<OUT> typeInformation;
public PythonMapPartition(int id, TypeInformation<OUT> typeInformation) {
this.typeInformation = typeInformation;
- streamer = new PythonStreamer(this, id, typeInformation instanceof PrimitiveArrayTypeInfo);
+ streamer = new PythonStreamer<>(this, id, typeInformation instanceof PrimitiveArrayTypeInfo);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
index 83de746..ba5d96a 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
@@ -13,7 +13,6 @@
package org.apache.flink.python.api.streaming.data;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
@@ -28,17 +27,17 @@ import org.apache.flink.util.Collector;
/**
* This class is used to read data from memory-mapped files.
*/
-public class PythonReceiver implements Serializable {
+public class PythonReceiver<OUT> implements Serializable {
private static final long serialVersionUID = -2474088929850009968L;
- private File inputFile;
- private RandomAccessFile inputRAF;
- private FileChannel inputChannel;
- private MappedByteBuffer fileBuffer;
+ private transient File inputFile;
+ private transient RandomAccessFile inputRAF;
+ private transient FileChannel inputChannel;
+ private transient MappedByteBuffer fileBuffer;
private final boolean readAsByteArray;
- private Deserializer<?> deserializer = null;
+ private transient Deserializer<OUT> deserializer;
public PythonReceiver(boolean usesByteArray) {
readAsByteArray = usesByteArray;
@@ -47,10 +46,10 @@ public class PythonReceiver implements Serializable {
//=====Setup========================================================================================================
public void open(String path) throws IOException {
setupMappedFile(path);
- deserializer = readAsByteArray ? new ByteArrayDeserializer() : new TupleDeserializer();
+ deserializer = (Deserializer<OUT>) (readAsByteArray ? new ByteArrayDeserializer() : new TupleDeserializer());
}
- private void setupMappedFile(String inputFilePath) throws FileNotFoundException, IOException {
+ private void setupMappedFile(String inputFilePath) throws IOException {
File x = new File(FLINK_TMP_DATA_DIR);
x.mkdirs();
@@ -75,6 +74,7 @@ public class PythonReceiver implements Serializable {
private void closeMappedFile() throws IOException {
inputChannel.close();
inputRAF.close();
+ inputFile.delete();
}
@@ -89,7 +89,7 @@ public class PythonReceiver implements Serializable {
* @throws IOException
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
- public void collectBuffer(Collector c, int bufferSize) throws IOException {
+ public void collectBuffer(Collector<OUT> c, int bufferSize) throws IOException {
fileBuffer.position(0);
while (fileBuffer.position() < bufferSize) {
@@ -99,7 +99,7 @@ public class PythonReceiver implements Serializable {
//=====Deserializer=================================================================================================
private interface Deserializer<T> {
- public T deserialize();
+ T deserialize();
}
private class ByteArrayDeserializer implements Deserializer<byte[]> {
http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
index c371e9d..8c40a6f 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
@@ -13,7 +13,6 @@
package org.apache.flink.python.api.streaming.data;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
@@ -29,26 +28,31 @@ import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
/**
* General-purpose class to write data to memory-mapped files.
*/
-public class PythonSender<IN> implements Serializable {
- public static final byte TYPE_ARRAY = (byte) 63;
- public static final byte TYPE_KEY_VALUE = (byte) 62;
- public static final byte TYPE_VALUE_VALUE = (byte) 61;
+public class PythonSender implements Serializable {
- private File outputFile;
- private RandomAccessFile outputRAF;
- private FileChannel outputChannel;
- private MappedByteBuffer fileBuffer;
+ private static final long serialVersionUID = -2004095650353962110L;
- private final ByteBuffer[] saved = new ByteBuffer[2];
+ public static final byte TYPE_ARRAY = 63;
+ public static final byte TYPE_KEY_VALUE = 62;
+ public static final byte TYPE_VALUE_VALUE = 61;
- private final Serializer[] serializer = new Serializer[2];
+ private transient File outputFile;
+ private transient RandomAccessFile outputRAF;
+ private transient FileChannel outputChannel;
+ private transient MappedByteBuffer fileBuffer;
+
+ private transient ByteBuffer[] saved;
+
+ private transient Serializer[] serializer;
//=====Setup========================================================================================================
public void open(String path) throws IOException {
+ saved = new ByteBuffer[2];
+ serializer = new Serializer[2];
setupMappedFile(path);
}
- private void setupMappedFile(String outputFilePath) throws FileNotFoundException, IOException {
+ private void setupMappedFile(String outputFilePath) throws IOException {
File x = new File(FLINK_TMP_DATA_DIR);
x.mkdirs();
@@ -73,6 +77,7 @@ public class PythonSender<IN> implements Serializable {
private void closeMappedFile() throws IOException {
outputChannel.close();
outputRAF.close();
+ outputFile.delete();
}
/**
@@ -128,7 +133,7 @@ public class PythonSender<IN> implements Serializable {
* @throws IOException
*/
@SuppressWarnings("unchecked")
- public int sendBuffer(Iterator i, int group) throws IOException {
+ public int sendBuffer(Iterator<?> i, int group) throws IOException {
fileBuffer.clear();
Object value;
@@ -165,20 +170,20 @@ public class PythonSender<IN> implements Serializable {
}
//=====Serializer===================================================================================================
- private Serializer getSerializer(Object value) {
+ private Serializer<?> getSerializer(Object value) {
if (value instanceof byte[]) {
return new ArraySerializer();
}
- if (((Tuple2) value).f0 instanceof byte[]) {
+ if (((Tuple2<?, ?>) value).f0 instanceof byte[]) {
return new ValuePairSerializer();
}
- if (((Tuple2) value).f0 instanceof Tuple) {
+ if (((Tuple2<?, ?>) value).f0 instanceof Tuple) {
return new KeyValuePairSerializer();
}
- throw new IllegalArgumentException("This object can't be serialized: " + value.toString());
+ throw new IllegalArgumentException("This object can't be serialized: " + value);
}
- private abstract class Serializer<T> {
+ private abstract static class Serializer<T> {
protected ByteBuffer buffer;
public ByteBuffer serialize(T value) {
@@ -190,7 +195,7 @@ public class PythonSender<IN> implements Serializable {
public abstract void serializeInternal(T value);
}
- private class ArraySerializer extends Serializer<byte[]> {
+ private static class ArraySerializer extends Serializer<byte[]> {
@Override
public void serializeInternal(byte[] value) {
buffer = ByteBuffer.allocate(value.length + 1);
@@ -199,7 +204,7 @@ public class PythonSender<IN> implements Serializable {
}
}
- private class ValuePairSerializer extends Serializer<Tuple2<byte[], byte[]>> {
+ private static class ValuePairSerializer extends Serializer<Tuple2<byte[], byte[]>> {
@Override
public void serializeInternal(Tuple2<byte[], byte[]> value) {
buffer = ByteBuffer.allocate(1 + value.f0.length + value.f1.length);
@@ -209,7 +214,7 @@ public class PythonSender<IN> implements Serializable {
}
}
- private class KeyValuePairSerializer extends Serializer<Tuple2<Tuple, byte[]>> {
+ private static class KeyValuePairSerializer extends Serializer<Tuple2<Tuple, byte[]>> {
@Override
public void serializeInternal(Tuple2<Tuple, byte[]> value) {
int keySize = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 13275c4..56ebf5b 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -45,8 +45,10 @@ import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCV
/**
* This streamer is used by functions to send/receive data to/from an external python process.
*/
-public class PythonStreamer implements Serializable {
+public class PythonStreamer<IN1, IN2, OUT> implements Serializable {
protected static final Logger LOG = LoggerFactory.getLogger(PythonStreamer.class);
+ private static final long serialVersionUID = -2342256613658373170L;
+
private static final int SIGNAL_BUFFER_REQUEST = 0;
private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
@@ -58,19 +60,16 @@ public class PythonStreamer implements Serializable {
private final boolean usePython3;
private final String planArguments;
- private String inputFilePath;
- private String outputFilePath;
-
- private Process process;
- private Thread shutdownThread;
- protected ServerSocket server;
- protected Socket socket;
- protected DataInputStream in;
- protected DataOutputStream out;
+ private transient Process process;
+ private transient Thread shutdownThread;
+ protected transient ServerSocket server;
+ protected transient Socket socket;
+ protected transient DataInputStream in;
+ protected transient DataOutputStream out;
protected int port;
protected PythonSender sender;
- protected PythonReceiver receiver;
+ protected PythonReceiver<OUT> receiver;
protected StringBuilder msg = new StringBuilder();
@@ -97,8 +96,8 @@ public class PythonStreamer implements Serializable {
}
private void startPython() throws IOException {
- this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
- this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
+ String outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
+ String inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
sender.open(inputFilePath);
receiver.open(outputFilePath);
@@ -110,7 +109,7 @@ public class PythonStreamer implements Serializable {
try {
Runtime.getRuntime().exec(pythonBinaryPath);
- } catch (IOException ex) {
+ } catch (IOException ignored) {
throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
}
@@ -123,7 +122,7 @@ public class PythonStreamer implements Serializable {
public void run() {
try {
destroyProcess();
- } catch (IOException ex) {
+ } catch (IOException ignored) {
}
}
};
@@ -142,12 +141,12 @@ public class PythonStreamer implements Serializable {
try { // wait a bit to catch syntax errors
Thread.sleep(2000);
- } catch (InterruptedException ex) {
+ } catch (InterruptedException ignored) {
}
try {
process.exitValue();
throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
- } catch (IllegalThreadStateException ise) { //process still active -> start receiving data
+ } catch (IllegalThreadStateException ignored) { //process still active -> start receiving data
}
while (true) {
@@ -167,11 +166,10 @@ public class PythonStreamer implements Serializable {
int value = process.exitValue();
if (value != 0) {
throw new RuntimeException("Plan file caused an error. Check log-files for details.");
- }
- if (value == 0) {
+ } else {
throw new RuntimeException("Plan file exited prematurely without an error.");
}
- } catch (IllegalThreadStateException ise) {//Process still running
+ } catch (IllegalThreadStateException ignored) {//Process still running
}
}
@@ -186,7 +184,7 @@ public class PythonStreamer implements Serializable {
sender.close();
receiver.close();
} catch (Exception e) {
- LOG.error("Exception occurred while closing Streamer. :" + e.getMessage());
+ LOG.error("Exception occurred while closing Streamer. :{}", e.getMessage());
}
destroyProcess();
if (shutdownThread != null) {
@@ -197,18 +195,18 @@ public class PythonStreamer implements Serializable {
private void destroyProcess() throws IOException {
try {
process.exitValue();
- } catch (IllegalThreadStateException ise) { //process still active
+ } catch (IllegalThreadStateException ignored) { //process still active
if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
int pid;
try {
Field f = process.getClass().getDeclaredField("pid");
f.setAccessible(true);
pid = f.getInt(process);
- } catch (Throwable e) {
+ } catch (Throwable ignore) {
process.destroy();
return;
}
- String[] args = new String[]{"kill", "-9", "" + pid};
+ String[] args = new String[]{"kill", "-9", String.valueOf(pid)};
Runtime.getRuntime().exec(args);
} else {
process.destroy();
@@ -247,7 +245,7 @@ public class PythonStreamer implements Serializable {
StringSerializer stringSerializer = new StringSerializer();
for (String name : names) {
- Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator();
+ Iterator<?> bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator();
out.write(stringSerializer.serializeWithoutTypeInfo(name));
@@ -269,7 +267,7 @@ public class PythonStreamer implements Serializable {
* @param c collector
* @throws IOException
*/
- public final void streamBufferWithoutGroups(Iterator i, Collector c) throws IOException {
+ public final void streamBufferWithoutGroups(Iterator<IN1> i, Collector<OUT> c) throws IOException {
try {
int size;
if (i.hasNext()) {
@@ -289,7 +287,7 @@ public class PythonStreamer implements Serializable {
case SIGNAL_ERROR:
try { //wait before terminating to ensure that the complete error message is printed
Thread.sleep(2000);
- } catch (InterruptedException ex) {
+ } catch (InterruptedException ignored) {
}
throw new RuntimeException(
"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
@@ -300,7 +298,7 @@ public class PythonStreamer implements Serializable {
}
}
}
- } catch (SocketTimeoutException ste) {
+ } catch (SocketTimeoutException ignored) {
throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
}
}
@@ -313,7 +311,7 @@ public class PythonStreamer implements Serializable {
* @param c collector
* @throws IOException
*/
- public final void streamBufferWithGroups(Iterator i1, Iterator i2, Collector c) throws IOException {
+ public final void streamBufferWithGroups(Iterator<IN1> i1, Iterator<IN2> i2, Collector<OUT> c) throws IOException {
try {
int size;
if (i1.hasNext() || i2.hasNext()) {
@@ -337,7 +335,7 @@ public class PythonStreamer implements Serializable {
case SIGNAL_ERROR:
try { //wait before terminating to ensure that the complete error message is printed
Thread.sleep(2000);
- } catch (InterruptedException ex) {
+ } catch (InterruptedException ignored) {
}
throw new RuntimeException(
"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
@@ -348,7 +346,7 @@ public class PythonStreamer implements Serializable {
}
}
}
- } catch (SocketTimeoutException ste) {
+ } catch (SocketTimeoutException ignored) {
throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
index da83a06..6276302 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
@@ -15,7 +15,6 @@ package org.apache.flink.python.api.streaming.plan;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.Serializable;
import org.apache.flink.api.java.tuple.Tuple;
import static org.apache.flink.python.api.streaming.data.PythonReceiver.createTuple;
import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_BOOLEAN;
@@ -34,7 +33,7 @@ import org.apache.flink.python.api.types.CustomTypeWrapper;
/**
* Instances of this class can be used to receive data from the plan process.
*/
-public class PythonPlanReceiver implements Serializable {
+public class PythonPlanReceiver {
private final DataInputStream input;
public PythonPlanReceiver(InputStream input) {
@@ -50,7 +49,7 @@ public class PythonPlanReceiver implements Serializable {
}
private Deserializer getDeserializer() throws IOException {
- byte type = (byte) input.readByte();
+ byte type = input.readByte();
if (type >= 0 && type < 26) {
Deserializer[] d = new Deserializer[type];
for (int x = 0; x < d.length; x++) {
@@ -82,7 +81,8 @@ public class PythonPlanReceiver implements Serializable {
}
}
- private abstract class Deserializer<T> {
+ private abstract static class Deserializer<T> {
+
public T deserialize() throws IOException {
return deserialize(false);
}
@@ -90,8 +90,8 @@ public class PythonPlanReceiver implements Serializable {
public abstract T deserialize(boolean normalized) throws IOException;
}
- private class TupleDeserializer extends Deserializer<Tuple> {
- Deserializer[] deserializer;
+ private static class TupleDeserializer extends Deserializer<Tuple> {
+ private final Deserializer[] deserializer;
public TupleDeserializer(Deserializer[] deserializer) {
this.deserializer = deserializer;
http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
index 2f95822..8b8366a 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
@@ -15,13 +15,13 @@ package org.apache.flink.python.api.streaming.plan;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.Serializable;
import org.apache.flink.python.api.streaming.util.SerializationUtils;
/**
* Instances of this class can be used to send data to the plan process.
*/
-public class PythonPlanSender implements Serializable {
+public class PythonPlanSender {
+
private final DataOutputStream output;
public PythonPlanSender(OutputStream output) {
http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index 7b0b63f..c27776b 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -14,9 +14,10 @@ package org.apache.flink.python.api.streaming.plan;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.python.api.streaming.util.StreamPrinter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
@@ -29,7 +30,10 @@ import static org.apache.flink.python.api.PythonPlanBinder.usePython3;
/**
* Generic class to exchange data during the plan phase.
*/
-public class PythonPlanStreamer implements Serializable {
+public class PythonPlanStreamer {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(PythonPlanStreamer.class);
+
protected PythonPlanSender sender;
protected PythonPlanReceiver receiver;
@@ -70,7 +74,7 @@ public class PythonPlanStreamer implements Serializable {
try {
Runtime.getRuntime().exec(pythonBinaryPath);
- } catch (IOException ex) {
+ } catch (IOException ignored) {
throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
}
process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + tmpPath + FLINK_PYTHON_PLAN_NAME + args);
@@ -80,7 +84,7 @@ public class PythonPlanStreamer implements Serializable {
try {
Thread.sleep(2000);
- } catch (InterruptedException ex) {
+ } catch (InterruptedException ignored) {
}
checkPythonProcessHealth();
@@ -93,9 +97,15 @@ public class PythonPlanStreamer implements Serializable {
public void close() {
try {
process.exitValue();
- } catch (NullPointerException npe) { //exception occurred before process was started
- } catch (IllegalThreadStateException ise) { //process still active
+ } catch (NullPointerException ignored) { //exception occurred before process was started
+ } catch (IllegalThreadStateException ignored) { //process still active
process.destroy();
+ } finally {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close socket.", e);
+ }
}
}
@@ -104,11 +114,10 @@ public class PythonPlanStreamer implements Serializable {
int value = process.exitValue();
if (value != 0) {
throw new RuntimeException("Plan file caused an error. Check log-files for details.");
- }
- if (value == 0) {
+ } else {
throw new RuntimeException("Plan file exited prematurely without an error.");
}
- } catch (IllegalThreadStateException ise) {//Process still running
+ } catch (IllegalThreadStateException ignored) {//Process still running
}
}
}
[2/2] flink git commit: [FLINK-5650] [py] Continuously check
PyProcess health while waiting for incoming connection
Posted by ch...@apache.org.
[FLINK-5650] [py] Continuously check PyProcess health while waiting for incoming connection
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2bb5de6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2bb5de6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2bb5de6
Branch: refs/heads/master
Commit: c2bb5de6faeb314bd8cdb8c279a5cd9bfb4df059
Parents: 03889ae
Author: zentol <ch...@apache.org>
Authored: Wed Mar 15 17:06:12 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Mar 16 17:03:38 2017 +0100
----------------------------------------------------------------------
.../api/streaming/data/PythonStreamer.java | 23 ++++++++++++-
.../api/streaming/plan/PythonPlanStreamer.java | 35 ++++++++++++++------
2 files changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c2bb5de6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index c968bd6..13275c4 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -92,6 +92,7 @@ public class PythonStreamer implements Serializable {
*/
public void open() throws IOException {
server = new ServerSocket(0);
+ server.setSoTimeout(50);
startPython();
}
@@ -149,11 +150,31 @@ public class PythonStreamer implements Serializable {
} catch (IllegalThreadStateException ise) { //process still active -> start receiving data
}
- socket = server.accept();
+ while (true) {
+ try {
+ socket = server.accept();
+ break;
+ } catch (SocketTimeoutException ignored) {
+ checkPythonProcessHealth();
+ }
+ }
in = new DataInputStream(socket.getInputStream());
out = new DataOutputStream(socket.getOutputStream());
}
+ private void checkPythonProcessHealth() {
+ try {
+ int value = process.exitValue();
+ if (value != 0) {
+ throw new RuntimeException("Plan file caused an error. Check log-files for details.");
+ }
+ if (value == 0) {
+ throw new RuntimeException("Plan file exited prematurely without an error.");
+ }
+ } catch (IllegalThreadStateException ise) {//Process still running
+ }
+ }
+
/**
* Closes this streamer.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/c2bb5de6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index 06af9d8..7b0b63f 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -19,6 +19,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketTimeoutException;
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
@@ -50,8 +51,16 @@ public class PythonPlanStreamer implements Serializable {
public void open(String tmpPath, String args) throws IOException {
server = new ServerSocket(0);
+ server.setSoTimeout(50);
startPython(tmpPath, args);
- socket = server.accept();
+ while (true) {
+ try {
+ socket = server.accept();
+ break;
+ } catch (SocketTimeoutException ignored) {
+ checkPythonProcessHealth();
+ }
+ }
sender = new PythonPlanSender(socket.getOutputStream());
receiver = new PythonPlanReceiver(socket.getInputStream());
}
@@ -74,16 +83,7 @@ public class PythonPlanStreamer implements Serializable {
} catch (InterruptedException ex) {
}
- try {
- int value = process.exitValue();
- if (value != 0) {
- throw new RuntimeException("Plan file caused an error. Check log-files for details.");
- }
- if (value == 0) {
- throw new RuntimeException("Plan file exited prematurely without an error.");
- }
- } catch (IllegalThreadStateException ise) {//Process still running
- }
+ checkPythonProcessHealth();
process.getOutputStream().write("plan\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
process.getOutputStream().write((server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
@@ -98,4 +98,17 @@ public class PythonPlanStreamer implements Serializable {
process.destroy();
}
}
+
+ private void checkPythonProcessHealth() {
+ try {
+ int value = process.exitValue();
+ if (value != 0) {
+ throw new RuntimeException("Plan file caused an error. Check log-files for details.");
+ }
+ if (value == 0) {
+ throw new RuntimeException("Plan file exited prematurely without an error.");
+ }
+ } catch (IllegalThreadStateException ise) {//Process still running
+ }
+ }
}