You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/03/16 16:05:46 UTC

[1/2] flink git commit: [refactor] [py] General code clean-up

Repository: flink
Updated Branches:
  refs/heads/master 03889ae1f -> 30bb958a7


[refactor] [py] General code clean-up

- added missing serialVersionUID
- PythonPlan* classes no longer implement serializable
- marked several fields as transient
- replaced some fields by local variables
- input/output files are now being deleted
- remvoed some unnecessary casts
- renamved several ignored exceptions
- close socket in PythonPlanStreamer
- simplified PyProcess health check
- eliminated several uses of raw types
- removed some unthrown exception declarations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30bb958a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30bb958a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30bb958a

Branch: refs/heads/master
Commit: 30bb958a73e74ced94548897962740a59677dc12
Parents: c2bb5de
Author: zentol <ch...@apache.org>
Authored: Thu Mar 16 13:24:21 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Mar 16 17:03:38 2017 +0100

----------------------------------------------------------------------
 .../python/api/functions/PythonCoGroup.java     | 11 ++--
 .../api/functions/PythonMapPartition.java       | 11 ++--
 .../api/streaming/data/PythonReceiver.java      | 22 +++----
 .../python/api/streaming/data/PythonSender.java | 47 ++++++++-------
 .../api/streaming/data/PythonStreamer.java      | 60 ++++++++++----------
 .../api/streaming/plan/PythonPlanReceiver.java  | 12 ++--
 .../api/streaming/plan/PythonPlanSender.java    |  4 +-
 .../api/streaming/plan/PythonPlanStreamer.java  | 27 ++++++---
 8 files changed, 106 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
index 33d88c3..9da5a4c 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
@@ -27,13 +27,16 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
  * @param <IN2>
  * @param <OUT>
  */
-public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, IN2, OUT> implements ResultTypeQueryable {
-	private final PythonStreamer streamer;
-	private transient final TypeInformation<OUT> typeInformation;
+public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, IN2, OUT> implements ResultTypeQueryable<OUT> {
+
+	private static final long serialVersionUID = -3997396583317513873L;
+
+	private final PythonStreamer<IN1, IN2, OUT> streamer;
+	private final transient TypeInformation<OUT> typeInformation;
 
 	public PythonCoGroup(int id, TypeInformation<OUT> typeInformation) {
 		this.typeInformation = typeInformation;
-		streamer = new PythonStreamer(this, id, true);
+		streamer = new PythonStreamer<>(this, id, true);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
index 6282210..c596d6c 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
@@ -28,13 +28,16 @@ import org.apache.flink.util.Collector;
  * @param <IN>
  * @param <OUT>
  */
-public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, OUT> implements ResultTypeQueryable {
-	private final PythonStreamer streamer;
-	private transient final TypeInformation<OUT> typeInformation;
+public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, OUT> implements ResultTypeQueryable<OUT>{
+
+	private static final long serialVersionUID = 3866306483023916413L;
+
+	private final PythonStreamer<IN, IN, OUT> streamer;
+	private final transient TypeInformation<OUT> typeInformation;
 
 	public PythonMapPartition(int id, TypeInformation<OUT> typeInformation) {
 		this.typeInformation = typeInformation;
-		streamer = new PythonStreamer(this, id, typeInformation instanceof PrimitiveArrayTypeInfo);
+		streamer = new PythonStreamer<>(this, id, typeInformation instanceof PrimitiveArrayTypeInfo);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
index 83de746..ba5d96a 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
@@ -13,7 +13,6 @@
 package org.apache.flink.python.api.streaming.data;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.io.Serializable;
@@ -28,17 +27,17 @@ import org.apache.flink.util.Collector;
 /**
  * This class is used to read data from memory-mapped files.
  */
-public class PythonReceiver implements Serializable {
+public class PythonReceiver<OUT> implements Serializable {
 	private static final long serialVersionUID = -2474088929850009968L;
 
-	private File inputFile;
-	private RandomAccessFile inputRAF;
-	private FileChannel inputChannel;
-	private MappedByteBuffer fileBuffer;
+	private transient File inputFile;
+	private transient RandomAccessFile inputRAF;
+	private transient FileChannel inputChannel;
+	private transient MappedByteBuffer fileBuffer;
 
 	private final boolean readAsByteArray;
 
-	private Deserializer<?> deserializer = null;
+	private transient Deserializer<OUT> deserializer;
 
 	public PythonReceiver(boolean usesByteArray) {
 		readAsByteArray = usesByteArray;
@@ -47,10 +46,10 @@ public class PythonReceiver implements Serializable {
 	//=====Setup========================================================================================================
 	public void open(String path) throws IOException {
 		setupMappedFile(path);
-		deserializer = readAsByteArray ? new ByteArrayDeserializer() : new TupleDeserializer();
+		deserializer = (Deserializer<OUT>) (readAsByteArray ? new ByteArrayDeserializer() : new TupleDeserializer());
 	}
 
-	private void setupMappedFile(String inputFilePath) throws FileNotFoundException, IOException {
+	private void setupMappedFile(String inputFilePath) throws IOException {
 		File x = new File(FLINK_TMP_DATA_DIR);
 		x.mkdirs();
 
@@ -75,6 +74,7 @@ public class PythonReceiver implements Serializable {
 	private void closeMappedFile() throws IOException {
 		inputChannel.close();
 		inputRAF.close();
+		inputFile.delete();
 	}
 
 
@@ -89,7 +89,7 @@ public class PythonReceiver implements Serializable {
 	 * @throws IOException
 	 */
 	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public void collectBuffer(Collector c, int bufferSize) throws IOException {
+	public void collectBuffer(Collector<OUT> c, int bufferSize) throws IOException {
 		fileBuffer.position(0);
 
 		while (fileBuffer.position() < bufferSize) {
@@ -99,7 +99,7 @@ public class PythonReceiver implements Serializable {
 
 	//=====Deserializer=================================================================================================
 	private interface Deserializer<T> {
-		public T deserialize();
+		T deserialize();
 	}
 
 	private class ByteArrayDeserializer implements Deserializer<byte[]> {

http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
index c371e9d..8c40a6f 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
@@ -13,7 +13,6 @@
 package org.apache.flink.python.api.streaming.data;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.io.Serializable;
@@ -29,26 +28,31 @@ import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
 /**
  * General-purpose class to write data to memory-mapped files.
  */
-public class PythonSender<IN> implements Serializable {
-	public static final byte TYPE_ARRAY = (byte) 63;
-	public static final byte TYPE_KEY_VALUE = (byte) 62;
-	public static final byte TYPE_VALUE_VALUE = (byte) 61;
+public class PythonSender implements Serializable {
 
-	private File outputFile;
-	private RandomAccessFile outputRAF;
-	private FileChannel outputChannel;
-	private MappedByteBuffer fileBuffer;
+	private static final long serialVersionUID = -2004095650353962110L;
 
-	private final ByteBuffer[] saved = new ByteBuffer[2];
+	public static final byte TYPE_ARRAY = 63;
+	public static final byte TYPE_KEY_VALUE = 62;
+	public static final byte TYPE_VALUE_VALUE = 61;
 
-	private final Serializer[] serializer = new Serializer[2];
+	private transient File outputFile;
+	private transient RandomAccessFile outputRAF;
+	private transient FileChannel outputChannel;
+	private transient MappedByteBuffer fileBuffer;
+
+	private transient ByteBuffer[] saved;
+
+	private transient Serializer[] serializer;
 
 	//=====Setup========================================================================================================
 	public void open(String path) throws IOException {
+		saved = new ByteBuffer[2];
+		serializer = new Serializer[2];
 		setupMappedFile(path);
 	}
 
-	private void setupMappedFile(String outputFilePath) throws FileNotFoundException, IOException {
+	private void setupMappedFile(String outputFilePath) throws IOException {
 		File x = new File(FLINK_TMP_DATA_DIR);
 		x.mkdirs();
 
@@ -73,6 +77,7 @@ public class PythonSender<IN> implements Serializable {
 	private void closeMappedFile() throws IOException {
 		outputChannel.close();
 		outputRAF.close();
+		outputFile.delete();
 	}
 
 	/**
@@ -128,7 +133,7 @@ public class PythonSender<IN> implements Serializable {
 	 * @throws IOException
 	 */
 	@SuppressWarnings("unchecked")
-	public int sendBuffer(Iterator i, int group) throws IOException {
+	public int sendBuffer(Iterator<?> i, int group) throws IOException {
 		fileBuffer.clear();
 
 		Object value;
@@ -165,20 +170,20 @@ public class PythonSender<IN> implements Serializable {
 	}
 
 	//=====Serializer===================================================================================================
-	private Serializer getSerializer(Object value) {
+	private Serializer<?> getSerializer(Object value) {
 		if (value instanceof byte[]) {
 			return new ArraySerializer();
 		}
-		if (((Tuple2) value).f0 instanceof byte[]) {
+		if (((Tuple2<?, ?>) value).f0 instanceof byte[]) {
 			return new ValuePairSerializer();
 		}
-		if (((Tuple2) value).f0 instanceof Tuple) {
+		if (((Tuple2<?, ?>) value).f0 instanceof Tuple) {
 			return new KeyValuePairSerializer();
 		}
-		throw new IllegalArgumentException("This object can't be serialized: " + value.toString());
+		throw new IllegalArgumentException("This object can't be serialized: " + value);
 	}
 
-	private abstract class Serializer<T> {
+	private abstract static class Serializer<T> {
 		protected ByteBuffer buffer;
 
 		public ByteBuffer serialize(T value) {
@@ -190,7 +195,7 @@ public class PythonSender<IN> implements Serializable {
 		public abstract void serializeInternal(T value);
 	}
 
-	private class ArraySerializer extends Serializer<byte[]> {
+	private static class ArraySerializer extends Serializer<byte[]> {
 		@Override
 		public void serializeInternal(byte[] value) {
 			buffer = ByteBuffer.allocate(value.length + 1);
@@ -199,7 +204,7 @@ public class PythonSender<IN> implements Serializable {
 		}
 	}
 
-	private class ValuePairSerializer extends Serializer<Tuple2<byte[], byte[]>> {
+	private static class ValuePairSerializer extends Serializer<Tuple2<byte[], byte[]>> {
 		@Override
 		public void serializeInternal(Tuple2<byte[], byte[]> value) {
 			buffer = ByteBuffer.allocate(1 + value.f0.length + value.f1.length);
@@ -209,7 +214,7 @@ public class PythonSender<IN> implements Serializable {
 		}
 	}
 
-	private class KeyValuePairSerializer extends Serializer<Tuple2<Tuple, byte[]>> {
+	private static class KeyValuePairSerializer extends Serializer<Tuple2<Tuple, byte[]>> {
 		@Override
 		public void serializeInternal(Tuple2<Tuple, byte[]> value) {
 			int keySize = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 13275c4..56ebf5b 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -45,8 +45,10 @@ import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCV
 /**
  * This streamer is used by functions to send/receive data to/from an external python process.
  */
-public class PythonStreamer implements Serializable {
+public class PythonStreamer<IN1, IN2, OUT> implements Serializable {
 	protected static final Logger LOG = LoggerFactory.getLogger(PythonStreamer.class);
+	private static final long serialVersionUID = -2342256613658373170L;
+
 	private static final int SIGNAL_BUFFER_REQUEST = 0;
 	private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
 	private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
@@ -58,19 +60,16 @@ public class PythonStreamer implements Serializable {
 	private final boolean usePython3;
 	private final String planArguments;
 
-	private String inputFilePath;
-	private String outputFilePath;
-
-	private Process process;
-	private Thread shutdownThread;
-	protected ServerSocket server;
-	protected Socket socket;
-	protected DataInputStream in;
-	protected DataOutputStream out;
+	private transient Process process;
+	private transient Thread shutdownThread;
+	protected transient ServerSocket server;
+	protected transient Socket socket;
+	protected transient DataInputStream in;
+	protected transient DataOutputStream out;
 	protected int port;
 
 	protected PythonSender sender;
-	protected PythonReceiver receiver;
+	protected PythonReceiver<OUT> receiver;
 
 	protected StringBuilder msg = new StringBuilder();
 
@@ -97,8 +96,8 @@ public class PythonStreamer implements Serializable {
 	}
 
 	private void startPython() throws IOException {
-		this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
-		this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
+		String outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
+		String inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
 
 		sender.open(inputFilePath);
 		receiver.open(outputFilePath);
@@ -110,7 +109,7 @@ public class PythonStreamer implements Serializable {
 
 		try {
 			Runtime.getRuntime().exec(pythonBinaryPath);
-		} catch (IOException ex) {
+		} catch (IOException ignored) {
 			throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
 		}
 
@@ -123,7 +122,7 @@ public class PythonStreamer implements Serializable {
 			public void run() {
 				try {
 					destroyProcess();
-				} catch (IOException ex) {
+				} catch (IOException ignored) {
 				}
 			}
 		};
@@ -142,12 +141,12 @@ public class PythonStreamer implements Serializable {
 
 		try { // wait a bit to catch syntax errors
 			Thread.sleep(2000);
-		} catch (InterruptedException ex) {
+		} catch (InterruptedException ignored) {
 		}
 		try {
 			process.exitValue();
 			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
-		} catch (IllegalThreadStateException ise) { //process still active -> start receiving data
+		} catch (IllegalThreadStateException ignored) { //process still active -> start receiving data
 		}
 
 		while (true) {
@@ -167,11 +166,10 @@ public class PythonStreamer implements Serializable {
 			int value = process.exitValue();
 			if (value != 0) {
 				throw new RuntimeException("Plan file caused an error. Check log-files for details.");
-			}
-			if (value == 0) {
+			} else {
 				throw new RuntimeException("Plan file exited prematurely without an error.");
 			}
-		} catch (IllegalThreadStateException ise) {//Process still running
+		} catch (IllegalThreadStateException ignored) {//Process still running
 		}
 	}
 
@@ -186,7 +184,7 @@ public class PythonStreamer implements Serializable {
 			sender.close();
 			receiver.close();
 		} catch (Exception e) {
-			LOG.error("Exception occurred while closing Streamer. :" + e.getMessage());
+			LOG.error("Exception occurred while closing Streamer. :{}", e.getMessage());
 		}
 		destroyProcess();
 		if (shutdownThread != null) {
@@ -197,18 +195,18 @@ public class PythonStreamer implements Serializable {
 	private void destroyProcess() throws IOException {
 		try {
 			process.exitValue();
-		} catch (IllegalThreadStateException ise) { //process still active
+		} catch (IllegalThreadStateException ignored) { //process still active
 			if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
 				int pid;
 				try {
 					Field f = process.getClass().getDeclaredField("pid");
 					f.setAccessible(true);
 					pid = f.getInt(process);
-				} catch (Throwable e) {
+				} catch (Throwable ignore) {
 					process.destroy();
 					return;
 				}
-				String[] args = new String[]{"kill", "-9", "" + pid};
+				String[] args = new String[]{"kill", "-9", String.valueOf(pid)};
 				Runtime.getRuntime().exec(args);
 			} else {
 				process.destroy();
@@ -247,7 +245,7 @@ public class PythonStreamer implements Serializable {
 
 			StringSerializer stringSerializer = new StringSerializer();
 			for (String name : names) {
-				Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator();
+				Iterator<?> bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator();
 
 				out.write(stringSerializer.serializeWithoutTypeInfo(name));
 
@@ -269,7 +267,7 @@ public class PythonStreamer implements Serializable {
 	 * @param c collector
 	 * @throws IOException
 	 */
-	public final void streamBufferWithoutGroups(Iterator i, Collector c) throws IOException {
+	public final void streamBufferWithoutGroups(Iterator<IN1> i, Collector<OUT> c) throws IOException {
 		try {
 			int size;
 			if (i.hasNext()) {
@@ -289,7 +287,7 @@ public class PythonStreamer implements Serializable {
 						case SIGNAL_ERROR:
 							try { //wait before terminating to ensure that the complete error message is printed
 								Thread.sleep(2000);
-							} catch (InterruptedException ex) {
+							} catch (InterruptedException ignored) {
 							}
 							throw new RuntimeException(
 									"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
@@ -300,7 +298,7 @@ public class PythonStreamer implements Serializable {
 					}
 				}
 			}
-		} catch (SocketTimeoutException ste) {
+		} catch (SocketTimeoutException ignored) {
 			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
 		}
 	}
@@ -313,7 +311,7 @@ public class PythonStreamer implements Serializable {
 	 * @param c collector
 	 * @throws IOException
 	 */
-	public final void streamBufferWithGroups(Iterator i1, Iterator i2, Collector c) throws IOException {
+	public final void streamBufferWithGroups(Iterator<IN1> i1, Iterator<IN2> i2, Collector<OUT> c) throws IOException {
 		try {
 			int size;
 			if (i1.hasNext() || i2.hasNext()) {
@@ -337,7 +335,7 @@ public class PythonStreamer implements Serializable {
 						case SIGNAL_ERROR:
 							try { //wait before terminating to ensure that the complete error message is printed
 								Thread.sleep(2000);
-							} catch (InterruptedException ex) {
+							} catch (InterruptedException ignored) {
 							}
 							throw new RuntimeException(
 									"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
@@ -348,7 +346,7 @@ public class PythonStreamer implements Serializable {
 					}
 				}
 			}
-		} catch (SocketTimeoutException ste) {
+		} catch (SocketTimeoutException ignored) {
 			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
index da83a06..6276302 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
@@ -15,7 +15,6 @@ package org.apache.flink.python.api.streaming.plan;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.Serializable;
 import org.apache.flink.api.java.tuple.Tuple;
 import static org.apache.flink.python.api.streaming.data.PythonReceiver.createTuple;
 import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_BOOLEAN;
@@ -34,7 +33,7 @@ import org.apache.flink.python.api.types.CustomTypeWrapper;
 /**
  * Instances of this class can be used to receive data from the plan process.
  */
-public class PythonPlanReceiver implements Serializable {
+public class PythonPlanReceiver {
 	private final DataInputStream input;
 
 	public PythonPlanReceiver(InputStream input) {
@@ -50,7 +49,7 @@ public class PythonPlanReceiver implements Serializable {
 	}
 
 	private Deserializer getDeserializer() throws IOException {
-		byte type = (byte) input.readByte();
+		byte type = input.readByte();
 		if (type >= 0 && type < 26) {
 				Deserializer[] d = new Deserializer[type];
 				for (int x = 0; x < d.length; x++) {
@@ -82,7 +81,8 @@ public class PythonPlanReceiver implements Serializable {
 		}
 	}
 
-	private abstract class Deserializer<T> {
+	private abstract static class Deserializer<T> {
+		
 		public T deserialize() throws IOException {
 			return deserialize(false);
 		}
@@ -90,8 +90,8 @@ public class PythonPlanReceiver implements Serializable {
 		public abstract T deserialize(boolean normalized) throws IOException;
 	}
 
-	private class TupleDeserializer extends Deserializer<Tuple> {
-		Deserializer[] deserializer;
+	private static class TupleDeserializer extends Deserializer<Tuple> {
+		private final  Deserializer[] deserializer;
 
 		public TupleDeserializer(Deserializer[] deserializer) {
 			this.deserializer = deserializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
index 2f95822..8b8366a 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
@@ -15,13 +15,13 @@ package org.apache.flink.python.api.streaming.plan;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.Serializable;
 import org.apache.flink.python.api.streaming.util.SerializationUtils;
 
 /**
  * Instances of this class can be used to send data to the plan process.
  */
-public class PythonPlanSender implements Serializable {
+public class PythonPlanSender {
+
 	private final DataOutputStream output;
 
 	public PythonPlanSender(OutputStream output) {

http://git-wip-us.apache.org/repos/asf/flink/blob/30bb958a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index 7b0b63f..c27776b 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -14,9 +14,10 @@ package org.apache.flink.python.api.streaming.plan;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
@@ -29,7 +30,10 @@ import static org.apache.flink.python.api.PythonPlanBinder.usePython3;
 /**
  * Generic class to exchange data during the plan phase.
  */
-public class PythonPlanStreamer implements Serializable {
+public class PythonPlanStreamer {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(PythonPlanStreamer.class);
+
 	protected PythonPlanSender sender;
 	protected PythonPlanReceiver receiver;
 
@@ -70,7 +74,7 @@ public class PythonPlanStreamer implements Serializable {
 
 		try {
 			Runtime.getRuntime().exec(pythonBinaryPath);
-		} catch (IOException ex) {
+		} catch (IOException ignored) {
 			throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
 		}
 		process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + tmpPath + FLINK_PYTHON_PLAN_NAME + args);
@@ -80,7 +84,7 @@ public class PythonPlanStreamer implements Serializable {
 
 		try {
 			Thread.sleep(2000);
-		} catch (InterruptedException ex) {
+		} catch (InterruptedException ignored) {
 		}
 
 		checkPythonProcessHealth();
@@ -93,9 +97,15 @@ public class PythonPlanStreamer implements Serializable {
 	public void close() {
 		try {
 			process.exitValue();
-		} catch (NullPointerException npe) { //exception occurred before process was started
-		} catch (IllegalThreadStateException ise) { //process still active
+		} catch (NullPointerException ignored) { //exception occurred before process was started
+		} catch (IllegalThreadStateException ignored) { //process still active
 			process.destroy();
+		} finally {
+			try {
+				socket.close();
+			} catch (IOException e) {
+				LOG.error("Failed to close socket.", e);
+			}
 		}
 	}
 
@@ -104,11 +114,10 @@ public class PythonPlanStreamer implements Serializable {
 			int value = process.exitValue();
 			if (value != 0) {
 				throw new RuntimeException("Plan file caused an error. Check log-files for details.");
-			}
-			if (value == 0) {
+			} else {
 				throw new RuntimeException("Plan file exited prematurely without an error.");
 			}
-		} catch (IllegalThreadStateException ise) {//Process still running
+		} catch (IllegalThreadStateException ignored) {//Process still running
 		}
 	}
 }


[2/2] flink git commit: [FLINK-5650] [py] Continuously check PyProcess health while waiting for incoming connection

Posted by ch...@apache.org.
[FLINK-5650] [py] Continuously check PyProcess health while waiting for incoming connection


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2bb5de6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2bb5de6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2bb5de6

Branch: refs/heads/master
Commit: c2bb5de6faeb314bd8cdb8c279a5cd9bfb4df059
Parents: 03889ae
Author: zentol <ch...@apache.org>
Authored: Wed Mar 15 17:06:12 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Mar 16 17:03:38 2017 +0100

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      | 23 ++++++++++++-
 .../api/streaming/plan/PythonPlanStreamer.java  | 35 ++++++++++++++------
 2 files changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2bb5de6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index c968bd6..13275c4 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -92,6 +92,7 @@ public class PythonStreamer implements Serializable {
 	 */
 	public void open() throws IOException {
 		server = new ServerSocket(0);
+		server.setSoTimeout(50);
 		startPython();
 	}
 
@@ -149,11 +150,31 @@ public class PythonStreamer implements Serializable {
 		} catch (IllegalThreadStateException ise) { //process still active -> start receiving data
 		}
 
-		socket = server.accept();
+		while (true) {
+			try {
+				socket = server.accept();
+				break;
+			} catch (SocketTimeoutException ignored) {
+				checkPythonProcessHealth();
+			}
+		}
 		in = new DataInputStream(socket.getInputStream());
 		out = new DataOutputStream(socket.getOutputStream());
 	}
 
+	private void checkPythonProcessHealth() {
+		try {
+			int value = process.exitValue();
+			if (value != 0) {
+				throw new RuntimeException("Plan file caused an error. Check log-files for details.");
+			}
+			if (value == 0) {
+				throw new RuntimeException("Plan file exited prematurely without an error.");
+			}
+		} catch (IllegalThreadStateException ise) {//Process still running
+		}
+	}
+
 	/**
 	 * Closes this streamer.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/c2bb5de6/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index 06af9d8..7b0b63f 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -19,6 +19,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
@@ -50,8 +51,16 @@ public class PythonPlanStreamer implements Serializable {
 
 	public void open(String tmpPath, String args) throws IOException {
 		server = new ServerSocket(0);
+		server.setSoTimeout(50);
 		startPython(tmpPath, args);
-		socket = server.accept();
+		while (true) {
+			try {
+				socket = server.accept();
+				break;
+			} catch (SocketTimeoutException ignored) {
+				checkPythonProcessHealth();
+			}
+		}
 		sender = new PythonPlanSender(socket.getOutputStream());
 		receiver = new PythonPlanReceiver(socket.getInputStream());
 	}
@@ -74,16 +83,7 @@ public class PythonPlanStreamer implements Serializable {
 		} catch (InterruptedException ex) {
 		}
 
-		try {
-			int value = process.exitValue();
-			if (value != 0) {
-				throw new RuntimeException("Plan file caused an error. Check log-files for details.");
-			}
-			if (value == 0) {
-				throw new RuntimeException("Plan file exited prematurely without an error.");
-			}
-		} catch (IllegalThreadStateException ise) {//Process still running
-		}
+		checkPythonProcessHealth();
 
 		process.getOutputStream().write("plan\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
 		process.getOutputStream().write((server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
@@ -98,4 +98,17 @@ public class PythonPlanStreamer implements Serializable {
 			process.destroy();
 		}
 	}
+
+	private void checkPythonProcessHealth() {
+		try {
+			int value = process.exitValue();
+			if (value != 0) {
+				throw new RuntimeException("Plan file caused an error. Check log-files for details.");
+			}
+			if (value == 0) {
+				throw new RuntimeException("Plan file exited prematurely without an error.");
+			}
+		} catch (IllegalThreadStateException ise) {//Process still running
+		}
+	}
 }