You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/14 23:49:31 UTC

[4/7] flink git commit: [FLINK-9589][py] Make PythonOperationInfo immutable

[FLINK-9589][py] Make PythonOperationInfo immutable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fc64994
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fc64994
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fc64994

Branch: refs/heads/master
Commit: 2fc6499496296cf2aa2408e4e15d684496a169a2
Parents: eaff4da
Author: zentol <ch...@apache.org>
Authored: Thu Jun 14 18:02:49 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 14 19:46:50 2018 +0200

----------------------------------------------------------------------
 .../flink/python/api/PythonOperationInfo.java   | 66 +++++++++++---------
 .../flink/python/api/PythonPlanBinder.java      | 23 ++++---
 2 files changed, 48 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2fc64994/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 42eeffb..6d7f402 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -20,7 +20,10 @@ import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
 
@@ -28,29 +31,29 @@ import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
  * Generic container for all information required to an operation to the DataSet API.
  */
 public class PythonOperationInfo {
-	public String identifier;
-	public int parentID; //DataSet that an operation is applied on
-	public int otherID; //secondary DataSet
-	public int setID; //ID for new DataSet
-	public String[] keys;
-	public String[] keys1; //join/cogroup keys
-	public String[] keys2; //join/cogroup keys
-	public TypeInformation<?> types; //typeinformation about output type
-	public Object[] values;
-	public int count;
-	public String field;
-	public Order order;
-	public String path;
-	public String fieldDelimiter;
-	public String lineDelimiter;
-	public long frm;
-	public long to;
-	public WriteMode writeMode;
-	public boolean toError;
-	public String name;
-	public boolean usesUDF;
-	public int parallelism;
-	public int envID;
+	public final String identifier;
+	public final int parentID; //DataSet that an operation is applied on
+	public final int otherID; //secondary DataSet
+	public final int setID; //ID for new DataSet
+	public final List<String> keys;
+	public final List<String> keys1; //join/cogroup keys
+	public final List<String> keys2; //join/cogroup keys
+	public final TypeInformation<?> types; //typeinformation about output type
+	public final List<Object> values;
+	public final int count;
+	public final String field;
+	public final Order order;
+	public final String path;
+	public final String fieldDelimiter;
+	public final String lineDelimiter;
+	public final long frm;
+	public final long to;
+	public final WriteMode writeMode;
+	public final boolean toError;
+	public final String name;
+	public final boolean usesUDF;
+	public final int parallelism;
+	public final int envID;
 
 	public PythonOperationInfo(PythonPlanStreamer streamer, int environmentID) throws IOException {
 		identifier = (String) streamer.getRecord();
@@ -75,9 +78,9 @@ public class PythonOperationInfo {
 				order = Order.NONE;
 				break;
 		}
-		keys = normalizeKeys(streamer.getRecord(true));
-		keys1 = normalizeKeys(streamer.getRecord(true));
-		keys2 = normalizeKeys(streamer.getRecord(true));
+		keys = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
+		keys1 = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
+		keys2 = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
 		Object tmpType = streamer.getRecord();
 		types = tmpType == null ? null : getForObject(tmpType);
 		usesUDF = (Boolean) streamer.getRecord();
@@ -94,10 +97,11 @@ public class PythonOperationInfo {
 		toError = (Boolean) streamer.getRecord();
 		count = (Integer) streamer.getRecord(true);
 		int valueCount = (Integer) streamer.getRecord(true);
-		values = new Object[valueCount];
+		List<Object> valueList = new ArrayList<>(valueCount);
 		for (int x = 0; x < valueCount; x++) {
-			values[x] = streamer.getRecord();
+			valueList.add(streamer.getRecord());
 		}
+		values = valueList;
 		parallelism = (Integer) streamer.getRecord(true);
 
 		envID = environmentID;
@@ -111,9 +115,9 @@ public class PythonOperationInfo {
 		sb.append("OtherID: ").append(otherID).append("\n");
 		sb.append("Name: ").append(name).append("\n");
 		sb.append("Types: ").append(types).append("\n");
-		sb.append("Keys1: ").append(Arrays.toString(keys1)).append("\n");
-		sb.append("Keys2: ").append(Arrays.toString(keys2)).append("\n");
-		sb.append("Keys: ").append(Arrays.toString(keys)).append("\n");
+		sb.append("Keys1: ").append(keys1).append("\n");
+		sb.append("Keys2: ").append(keys2).append("\n");
+		sb.append("Keys: ").append(keys).append("\n");
 		sb.append("Count: ").append(count).append("\n");
 		sb.append("Field: ").append(field).append("\n");
 		sb.append("Order: ").append(order.toString()).append("\n");

http://git-wip-us.apache.org/repos/asf/flink/blob/2fc64994/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index a4f38fe..1df15a5 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -55,6 +55,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.UUID;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
@@ -422,7 +423,7 @@ public class PythonPlanBinder {
 	}
 
 	private void createValueSource(ExecutionEnvironment env, PythonOperationInfo info) {
-		sets.add(info.setID, env.fromElements(info.values).setParallelism(info.parallelism).name("ValueSource")
+		sets.add(info.setID, env.fromCollection(info.values).setParallelism(info.parallelism).name("ValueSource")
 			.map(new SerializerMap<>()).setParallelism(info.parallelism).name("ValueSourcePostStep"));
 	}
 
@@ -470,7 +471,7 @@ public class PythonPlanBinder {
 	private <K extends Tuple> void createDistinctOperation(PythonOperationInfo info) {
 		DataSet<Tuple2<K, byte[]>> op = sets.getDataSet(info.parentID);
 		DataSet<byte[]> result = op
-			.distinct(info.keys).setParallelism(info.parallelism).name("Distinct")
+			.distinct(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism).name("Distinct")
 			.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("DistinctPostStep");
 		sets.add(info.setID, result);
 	}
@@ -495,13 +496,13 @@ public class PythonPlanBinder {
 
 	private void createGroupOperation(PythonOperationInfo info) {
 		DataSet<?> op1 = sets.getDataSet(info.parentID);
-		sets.add(info.setID, op1.groupBy(info.keys));
+		sets.add(info.setID, op1.groupBy(info.keys.toArray(new String[info.keys.size()])));
 	}
 
 	private <K extends Tuple> void createHashPartitionOperation(PythonOperationInfo info) {
 		DataSet<Tuple2<K, byte[]>> op1 = sets.getDataSet(info.parentID);
 		DataSet<byte[]> result = op1
-			.partitionByHash(info.keys).setParallelism(info.parallelism)
+			.partitionByHash(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism)
 			.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("HashPartitionPostStep");
 		sets.add(info.setID, result);
 	}
@@ -530,8 +531,8 @@ public class PythonPlanBinder {
 	private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo info, TypeInformation<OUT> type) {
 		DataSet<IN1> op1 = sets.getDataSet(info.parentID);
 		DataSet<IN2> op2 = sets.getDataSet(info.otherID);
-		Keys.ExpressionKeys<IN1> key1 = new Keys.ExpressionKeys<>(info.keys1, op1.getType());
-		Keys.ExpressionKeys<IN2> key2 = new Keys.ExpressionKeys<>(info.keys2, op2.getType());
+		Keys.ExpressionKeys<IN1> key1 = new Keys.ExpressionKeys<>(info.keys1.toArray(new String[info.keys1.size()]), op1.getType());
+		Keys.ExpressionKeys<IN2> key2 = new Keys.ExpressionKeys<>(info.keys2.toArray(new String[info.keys2.size()]), op2.getType());
 		PythonCoGroup<IN1, IN2, OUT> pcg = new PythonCoGroup<>(operatorConfig, info.envID, info.setID, type);
 		sets.add(info.setID, new CoGroupRawOperator<>(op1, op2, key1, key2, pcg, type, info.name).setParallelism(info.parallelism));
 	}
@@ -623,19 +624,21 @@ public class PythonPlanBinder {
 		}
 	}
 
-	private <IN1, IN2> DataSet<Tuple2<byte[], byte[]>> createDefaultJoin(DataSet<IN1> op1, DataSet<IN2> op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode, int parallelism) {
+	private <IN1, IN2> DataSet<Tuple2<byte[], byte[]>> createDefaultJoin(DataSet<IN1> op1, DataSet<IN2> op2, List<String> firstKeys, List<String> secondKeys, DatasizeHint mode, int parallelism) {
+		String[] firstKeysArray = firstKeys.toArray(new String[firstKeys.size()]);
+		String[] secondKeysArray = secondKeys.toArray(new String[secondKeys.size()]);
 		switch (mode) {
 			case NONE:
 				return op1
-					.join(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+					.join(op2).where(firstKeysArray).equalTo(secondKeysArray).setParallelism(parallelism)
 					.map(new NestedKeyDiscarder<Tuple2<IN1, IN2>>()).setParallelism(parallelism).name("DefaultJoinPostStep");
 			case HUGE:
 				return op1
-					.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+					.joinWithHuge(op2).where(firstKeysArray).equalTo(secondKeysArray).setParallelism(parallelism)
 					.map(new NestedKeyDiscarder<Tuple2<IN1, IN2>>()).setParallelism(parallelism).name("DefaultJoinPostStep");
 			case TINY:
 				return op1
-					.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+					.joinWithTiny(op2).where(firstKeysArray).equalTo(secondKeysArray).setParallelism(parallelism)
 					.map(new NestedKeyDiscarder<Tuple2<IN1, IN2>>()).setParallelism(parallelism).name("DefaultJoinPostStep");
 			default:
 				throw new IllegalArgumentException("Invalid join mode specified.");