You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/06/14 23:49:31 UTC
[4/7] flink git commit: [FLINK-9589][py] Make PythonOperationInfo
immutable
[FLINK-9589][py] Make PythonOperationInfo immutable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fc64994
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fc64994
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fc64994
Branch: refs/heads/master
Commit: 2fc6499496296cf2aa2408e4e15d684496a169a2
Parents: eaff4da
Author: zentol <ch...@apache.org>
Authored: Thu Jun 14 18:02:49 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jun 14 19:46:50 2018 +0200
----------------------------------------------------------------------
.../flink/python/api/PythonOperationInfo.java | 66 +++++++++++---------
.../flink/python/api/PythonPlanBinder.java | 23 ++++---
2 files changed, 48 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2fc64994/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 42eeffb..6d7f402 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -20,7 +20,10 @@ import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
@@ -28,29 +31,29 @@ import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
* Generic container for all information required to an operation to the DataSet API.
*/
public class PythonOperationInfo {
- public String identifier;
- public int parentID; //DataSet that an operation is applied on
- public int otherID; //secondary DataSet
- public int setID; //ID for new DataSet
- public String[] keys;
- public String[] keys1; //join/cogroup keys
- public String[] keys2; //join/cogroup keys
- public TypeInformation<?> types; //typeinformation about output type
- public Object[] values;
- public int count;
- public String field;
- public Order order;
- public String path;
- public String fieldDelimiter;
- public String lineDelimiter;
- public long frm;
- public long to;
- public WriteMode writeMode;
- public boolean toError;
- public String name;
- public boolean usesUDF;
- public int parallelism;
- public int envID;
+ public final String identifier;
+ public final int parentID; //DataSet that an operation is applied on
+ public final int otherID; //secondary DataSet
+ public final int setID; //ID for new DataSet
+ public final List<String> keys;
+ public final List<String> keys1; //join/cogroup keys
+ public final List<String> keys2; //join/cogroup keys
+ public final TypeInformation<?> types; //typeinformation about output type
+ public final List<Object> values;
+ public final int count;
+ public final String field;
+ public final Order order;
+ public final String path;
+ public final String fieldDelimiter;
+ public final String lineDelimiter;
+ public final long frm;
+ public final long to;
+ public final WriteMode writeMode;
+ public final boolean toError;
+ public final String name;
+ public final boolean usesUDF;
+ public final int parallelism;
+ public final int envID;
public PythonOperationInfo(PythonPlanStreamer streamer, int environmentID) throws IOException {
identifier = (String) streamer.getRecord();
@@ -75,9 +78,9 @@ public class PythonOperationInfo {
order = Order.NONE;
break;
}
- keys = normalizeKeys(streamer.getRecord(true));
- keys1 = normalizeKeys(streamer.getRecord(true));
- keys2 = normalizeKeys(streamer.getRecord(true));
+ keys = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
+ keys1 = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
+ keys2 = Collections.unmodifiableList(Arrays.asList(normalizeKeys(streamer.getRecord(true))));
Object tmpType = streamer.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
usesUDF = (Boolean) streamer.getRecord();
@@ -94,10 +97,11 @@ public class PythonOperationInfo {
toError = (Boolean) streamer.getRecord();
count = (Integer) streamer.getRecord(true);
int valueCount = (Integer) streamer.getRecord(true);
- values = new Object[valueCount];
+ List<Object> valueList = new ArrayList<>(valueCount);
for (int x = 0; x < valueCount; x++) {
- values[x] = streamer.getRecord();
+ valueList.add(streamer.getRecord());
}
+ values = valueList;
parallelism = (Integer) streamer.getRecord(true);
envID = environmentID;
@@ -111,9 +115,9 @@ public class PythonOperationInfo {
sb.append("OtherID: ").append(otherID).append("\n");
sb.append("Name: ").append(name).append("\n");
sb.append("Types: ").append(types).append("\n");
- sb.append("Keys1: ").append(Arrays.toString(keys1)).append("\n");
- sb.append("Keys2: ").append(Arrays.toString(keys2)).append("\n");
- sb.append("Keys: ").append(Arrays.toString(keys)).append("\n");
+ sb.append("Keys1: ").append(keys1).append("\n");
+ sb.append("Keys2: ").append(keys2).append("\n");
+ sb.append("Keys: ").append(keys).append("\n");
sb.append("Count: ").append(count).append("\n");
sb.append("Field: ").append(field).append("\n");
sb.append("Order: ").append(order.toString()).append("\n");
http://git-wip-us.apache.org/repos/asf/flink/blob/2fc64994/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index a4f38fe..1df15a5 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -55,6 +55,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import java.util.UUID;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@@ -422,7 +423,7 @@ public class PythonPlanBinder {
}
private void createValueSource(ExecutionEnvironment env, PythonOperationInfo info) {
- sets.add(info.setID, env.fromElements(info.values).setParallelism(info.parallelism).name("ValueSource")
+ sets.add(info.setID, env.fromCollection(info.values).setParallelism(info.parallelism).name("ValueSource")
.map(new SerializerMap<>()).setParallelism(info.parallelism).name("ValueSourcePostStep"));
}
@@ -470,7 +471,7 @@ public class PythonPlanBinder {
private <K extends Tuple> void createDistinctOperation(PythonOperationInfo info) {
DataSet<Tuple2<K, byte[]>> op = sets.getDataSet(info.parentID);
DataSet<byte[]> result = op
- .distinct(info.keys).setParallelism(info.parallelism).name("Distinct")
+ .distinct(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism).name("Distinct")
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("DistinctPostStep");
sets.add(info.setID, result);
}
@@ -495,13 +496,13 @@ public class PythonPlanBinder {
private void createGroupOperation(PythonOperationInfo info) {
DataSet<?> op1 = sets.getDataSet(info.parentID);
- sets.add(info.setID, op1.groupBy(info.keys));
+ sets.add(info.setID, op1.groupBy(info.keys.toArray(new String[info.keys.size()])));
}
private <K extends Tuple> void createHashPartitionOperation(PythonOperationInfo info) {
DataSet<Tuple2<K, byte[]>> op1 = sets.getDataSet(info.parentID);
DataSet<byte[]> result = op1
- .partitionByHash(info.keys).setParallelism(info.parallelism)
+ .partitionByHash(info.keys.toArray(new String[info.keys.size()])).setParallelism(info.parallelism)
.map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("HashPartitionPostStep");
sets.add(info.setID, result);
}
@@ -530,8 +531,8 @@ public class PythonPlanBinder {
private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo info, TypeInformation<OUT> type) {
DataSet<IN1> op1 = sets.getDataSet(info.parentID);
DataSet<IN2> op2 = sets.getDataSet(info.otherID);
- Keys.ExpressionKeys<IN1> key1 = new Keys.ExpressionKeys<>(info.keys1, op1.getType());
- Keys.ExpressionKeys<IN2> key2 = new Keys.ExpressionKeys<>(info.keys2, op2.getType());
+ Keys.ExpressionKeys<IN1> key1 = new Keys.ExpressionKeys<>(info.keys1.toArray(new String[info.keys1.size()]), op1.getType());
+ Keys.ExpressionKeys<IN2> key2 = new Keys.ExpressionKeys<>(info.keys2.toArray(new String[info.keys2.size()]), op2.getType());
PythonCoGroup<IN1, IN2, OUT> pcg = new PythonCoGroup<>(operatorConfig, info.envID, info.setID, type);
sets.add(info.setID, new CoGroupRawOperator<>(op1, op2, key1, key2, pcg, type, info.name).setParallelism(info.parallelism));
}
@@ -623,19 +624,21 @@ public class PythonPlanBinder {
}
}
- private <IN1, IN2> DataSet<Tuple2<byte[], byte[]>> createDefaultJoin(DataSet<IN1> op1, DataSet<IN2> op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode, int parallelism) {
+ private <IN1, IN2> DataSet<Tuple2<byte[], byte[]>> createDefaultJoin(DataSet<IN1> op1, DataSet<IN2> op2, List<String> firstKeys, List<String> secondKeys, DatasizeHint mode, int parallelism) {
+ String[] firstKeysArray = firstKeys.toArray(new String[firstKeys.size()]);
+ String[] secondKeysArray = secondKeys.toArray(new String[secondKeys.size()]);
switch (mode) {
case NONE:
return op1
- .join(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+ .join(op2).where(firstKeysArray).equalTo(secondKeysArray).setParallelism(parallelism)
.map(new NestedKeyDiscarder<Tuple2<IN1, IN2>>()).setParallelism(parallelism).name("DefaultJoinPostStep");
case HUGE:
return op1
- .joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+ .joinWithHuge(op2).where(firstKeysArray).equalTo(secondKeysArray).setParallelism(parallelism)
.map(new NestedKeyDiscarder<Tuple2<IN1, IN2>>()).setParallelism(parallelism).name("DefaultJoinPostStep");
case TINY:
return op1
- .joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).setParallelism(parallelism)
+ .joinWithTiny(op2).where(firstKeysArray).equalTo(secondKeysArray).setParallelism(parallelism)
.map(new NestedKeyDiscarder<Tuple2<IN1, IN2>>()).setParallelism(parallelism).name("DefaultJoinPostStep");
default:
throw new IllegalArgumentException("Invalid join mode specified.");