You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/29 18:49:11 UTC
[4/4] git commit: Change Partition Operator to actual general-purpose
Operator
Change Partition Operator to actual general-purpose Operator
It is no longer a special-case operator that can only be used in front
of map-style operations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/01e74da3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/01e74da3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/01e74da3
Branch: refs/heads/master
Commit: 01e74da3e8b10dba0afb2606f663f8e37104b283
Parents: f0ed58c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Sep 26 12:52:08 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 29 18:16:36 2014 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/DataSet.java | 30 +--
.../flink/api/java/operators/DataSource.java | 46 +---
.../api/java/operators/FilterOperator.java | 14 +-
.../api/java/operators/FlatMapOperator.java | 12 -
.../flink/api/java/operators/MapOperator.java | 15 +-
.../java/operators/MapPartitionOperator.java | 12 -
.../flink/api/java/operators/Operator.java | 4 +-
.../api/java/operators/PartitionOperator.java | 146 +++++++++++
.../api/java/operators/PartitionedDataSet.java | 246 -------------------
.../test/javaApiOperators/PartitionITCase.java | 2 +-
10 files changed, 167 insertions(+), 360 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 76363bd..bd28cde 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.java;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -58,7 +57,7 @@ import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.MapPartitionOperator;
-import org.apache.flink.api.java.operators.PartitionedDataSet;
+import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.api.java.operators.ProjectOperator.Projection;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.operators.SortedGrouping;
@@ -230,13 +229,13 @@ public abstract class DataSet<T> {
* Initiates a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
* <b>Note: Only Tuple DataSets can be projected.</b></br>
* The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
- * This method returns a {@link Projection} on which {@link Projection#types()} needs to
+ * This method returns a {@link Projection} on which {@link Projection#types} needs to
* be called to completed the transformation.
*
* @param fieldIndexes The field indexes of the input tuples that are retained.
* The order of fields in the output tuple corresponds to the order of field indexes.
* @return A Projection that needs to be converted into a {@link org.apache.flink.api.java.operators.ProjectOperator} to complete the
- * Project transformation by calling {@link Projection#types()}.
+ * Project transformation by calling {@link Projection#types}.
*
* @see Tuple
* @see DataSet
@@ -542,7 +541,7 @@ public abstract class DataSet<T> {
* joining elements into one DataSet.</br>
*
* This method returns a {@link JoinOperatorSets} on which
- * {@link JoinOperatorSets#where()} needs to be called to define the join key of the first
+ * {@link JoinOperatorSets#where} needs to be called to define the join key of the first
* joining (i.e., this) DataSet.
*
* @param other The other DataSet with which this DataSet is joined.
@@ -563,7 +562,7 @@ public abstract class DataSet<T> {
* This method also gives the hint to the optimizer that the second DataSet to join is much
* smaller than the first one.</br>
* This method returns a {@link JoinOperatorSets} on which
- * {@link JoinOperatorSets#where()} needs to be called to define the join key of the first
+ * {@link JoinOperatorSets#where} needs to be called to define the join key of the first
* joining (i.e., this) DataSet.
*
* @param other The other DataSet with which this DataSet is joined.
@@ -584,7 +583,7 @@ public abstract class DataSet<T> {
* This method also gives the hint to the optimizer that the second DataSet to join is much
* larger than the first one.</br>
* This method returns a {@link JoinOperatorSets JoinOperatorSet} on which
- * {@link JoinOperatorSets#where()} needs to be called to define the join key of the first
+ * {@link JoinOperatorSets#where} needs to be called to define the join key of the first
* joining (i.e., this) DataSet.
*
* @param other The other DataSet with which this DataSet is joined.
@@ -611,7 +610,7 @@ public abstract class DataSet<T> {
* The CoGroupFunction can iterate over the elements of both groups and return any number
* of elements including none.</br>
* This method returns a {@link CoGroupOperatorSets} on which
- * {@link CoGroupOperatorSets#where()} needs to be called to define the grouping key of the first
+ * {@link CoGroupOperatorSets#where} needs to be called to define the grouping key of the first
* (i.e., this) DataSet.
*
* @param other The other DataSet of the CoGroup transformation.
@@ -654,7 +653,8 @@ public abstract class DataSet<T> {
* second input being the second field of the tuple.
*
* <p>
- * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a {@link CrossFunction} which is called for
+ * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
+ * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
* each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
*
* @param other The other DataSet with which this DataSet is crossed.
@@ -862,8 +862,8 @@ public abstract class DataSet<T> {
* @param fields The field indexes on which the DataSet is hash-partitioned.
* @return The partitioned DataSet.
*/
- public PartitionedDataSet<T> partitionByHash(int... fields) {
- return new PartitionedDataSet<T>(this, PartitionMethod.HASH, new Keys.FieldPositionKeys<T>(fields, getType(), false));
+ public PartitionOperator<T> partitionByHash(int... fields) {
+ return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.FieldPositionKeys<T>(fields, getType(), false));
}
/**
@@ -876,9 +876,9 @@ public abstract class DataSet<T> {
*
* @see KeySelector
*/
- public <K extends Comparable<K>> PartitionedDataSet<T> partitionByHash(KeySelector<T, K> keyExtractor) {
+ public <K extends Comparable<K>> PartitionOperator<T> partitionByHash(KeySelector<T, K> keyExtractor) {
final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
- return new PartitionedDataSet<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, this.getType(), keyType));
+ return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, this.getType(), keyType));
}
/**
@@ -889,8 +889,8 @@ public abstract class DataSet<T> {
*
* @return The rebalanced DataSet.
*/
- public PartitionedDataSet<T> rebalance() {
- return new PartitionedDataSet<T>(this, PartitionMethod.REBALANCE);
+ public PartitionOperator<T> rebalance() {
+ return new PartitionOperator<T>(this, PartitionMethod.REBALANCE);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
index 45bab86..9e3c4a2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.OperatorInformation;
import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
@@ -32,13 +31,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
*
* @param <OUT> The type of the elements produced by this data source.
*/
-public class DataSource<OUT> extends DataSet<OUT> {
+public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
private final InputFormat<OUT, ?> inputFormat;
-
- private String name;
-
- private int dop = -1;
// --------------------------------------------------------------------------------------------
@@ -70,45 +65,6 @@ public class DataSource<OUT> extends DataSet<OUT> {
// --------------------------------------------------------------------------------------------
- /**
- * Sets the name of the data source operation. The name will be used for logging and other
- * messages. The default name is a textual representation of the input format.
- *
- * @param name The name for the data source.
- * @return The data source object itself, to allow for function call chaining.
- */
- public DataSource<OUT> name(String name) {
- this.name = name;
- return this;
- }
-
- /**
- * Returns the degree of parallelism of this data source.
- *
- * @return The degree of parallelism of this data source.
- */
- public int getParallelism() {
- return this.dop;
- }
-
- /**
- * Sets the degree of parallelism for this data source.
- * The degree must be 1 or more.
- *
- * @param dop The degree of parallelism for this data source.
- * @return This data source with set degree of parallelism.
- */
- public DataSource<OUT> setParallelism(int dop) {
- if(dop < 1) {
- throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
- }
- this.dop = dop;
-
- return this;
- }
-
- // --------------------------------------------------------------------------------------------
-
protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
String name = this.name != null ? this.name : this.inputFormat.toString();
if (name.length() > 100) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
index 727820a..ab8a6c5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
@@ -33,8 +33,6 @@ import org.apache.flink.api.java.DataSet;
public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperator<T>> {
protected final FilterFunction<T> function;
-
- protected PartitionedDataSet<T> partitionedDataSet;
public FilterOperator(DataSet<T> input, FilterFunction<T> function) {
super(input, input.getType());
@@ -43,19 +41,9 @@ public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperat
extractSemanticAnnotationsFromUdf(function.getClass());
}
- public FilterOperator(PartitionedDataSet<T> input, FilterFunction<T> function) {
- this(input.getDataSet(), function);
- this.partitionedDataSet = input;
- }
-
@Override
protected org.apache.flink.api.common.operators.base.FilterOperatorBase<T, FlatMapFunction<T,T>> translateToDataFlow(Operator<T> input) {
-
- // inject partition operator if necessary
- if(this.partitionedDataSet != null) {
- input = this.partitionedDataSet.translateToDataFlow(input, this.getParallelism());
- }
-
+
String name = getName() != null ? getName() : function.getClass().getName();
// create operator
PlanFilterOperator<T> po = new PlanFilterOperator<T>(function, name, getInputType());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index 0dc401e..b7e336f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -36,8 +36,6 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
protected final FlatMapFunction<IN, OUT> function;
- protected PartitionedDataSet<IN> partitionedDataSet;
-
public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function) {
super(input, resultType);
@@ -45,19 +43,9 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
extractSemanticAnnotationsFromUdf(function.getClass());
}
- public FlatMapOperator(PartitionedDataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function) {
- this(input.getDataSet(), resultType, function);
- this.partitionedDataSet = input;
- }
-
@Override
protected org.apache.flink.api.common.operators.base.FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
- // inject partition operator if necessary
- if(this.partitionedDataSet != null) {
- input = this.partitionedDataSet.translateToDataFlow(input, this.getParallelism());
- }
-
String name = getName() != null ? getName() : function.getClass().getName();
// create operator
FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index e45cc96..f1ece2c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -37,10 +37,7 @@ import org.apache.flink.api.java.DataSet;
public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOperator<IN, OUT>> {
protected final MapFunction<IN, OUT> function;
-
- protected PartitionedDataSet<IN> partitionedDataSet;
-
-
+
public MapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapFunction<IN, OUT> function) {
super(input, resultType);
@@ -49,19 +46,9 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
extractSemanticAnnotationsFromUdf(function.getClass());
}
- public MapOperator(PartitionedDataSet<IN> input, TypeInformation<OUT> resultType, MapFunction<IN, OUT> function) {
- this(input.getDataSet(), resultType, function);
- this.partitionedDataSet = input;
- }
-
@Override
protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
- // inject partition operator if necessary
- if(this.partitionedDataSet != null) {
- input = this.partitionedDataSet.translateToDataFlow(input, this.getParallelism());
- }
-
String name = getName() != null ? getName() : function.getClass().getName();
// create operator
MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
index 9c896f9..839298b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
@@ -38,8 +38,6 @@ public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
protected final MapPartitionFunction<IN, OUT> function;
- protected PartitionedDataSet<IN> partitionedDataSet;
-
public MapPartitionOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapPartitionFunction<IN, OUT> function) {
super(input, resultType);
@@ -47,19 +45,9 @@ public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
extractSemanticAnnotationsFromUdf(function.getClass());
}
- public MapPartitionOperator(PartitionedDataSet<IN> input, TypeInformation<OUT> resultType, MapPartitionFunction<IN, OUT> function) {
- this(input.getDataSet(), resultType, function);
- this.partitionedDataSet = input;
- }
-
@Override
protected MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
- // inject partition operator if necessary
- if(this.partitionedDataSet != null) {
- input = this.partitionedDataSet.translateToDataFlow(input, this.getParallelism());
- }
-
String name = getName() != null ? getName() : function.getClass().getName();
// create operator
MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
index 53ff546..0f8a3eb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
@@ -30,9 +30,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
*/
public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<OUT> {
- private String name;
+ protected String name;
- private int dop = -1;
+ protected int dop = -1;
protected Operator(ExecutionEnvironment context, TypeInformation<OUT> resultType) {
super(context, resultType);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
new file mode 100644
index 0000000..f0931b5
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
+import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
+import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+/**
+ * This operator represents a partitioning.
+ *
+ * @param <T> The type of the data being partitioned.
+ */
+public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, PartitionOperator<T>> {
+
+ private final Keys<T> pKeys;
+ private final PartitionMethod pMethod;
+
+ public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys) {
+ super(input, input.getType());
+
+ if(pMethod == PartitionMethod.HASH && pKeys == null) {
+ throw new IllegalArgumentException("Hash Partitioning requires keys");
+ } else if(pMethod == PartitionMethod.RANGE) {
+ throw new UnsupportedOperationException("Range Partitioning not yet supported");
+ }
+
+ if(pKeys instanceof Keys.FieldPositionKeys<?> && !input.getType().isTupleType()) {
+ throw new IllegalArgumentException("Hash Partitioning with key fields only possible on Tuple DataSets");
+ }
+
+ this.pMethod = pMethod;
+ this.pKeys = pKeys;
+ }
+
+ public PartitionOperator(DataSet<T> input, PartitionMethod pMethod) {
+ this(input, pMethod, null);
+ }
+
+ /*
+ * Translation of partitioning
+ */
+ protected org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> translateToDataFlow(Operator<T> input) {
+
+ String name = "Partition";
+
+ // distinguish between partition types
+ if (pMethod == PartitionMethod.REBALANCE) {
+
+ UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getType(), getType());
+ PartitionOperatorBase<T> noop = new PartitionOperatorBase<T>(operatorInfo, pMethod, name);
+ // set input
+ noop.setInput(input);
+ // set DOP
+ noop.setDegreeOfParallelism(getParallelism());
+
+ return noop;
+ }
+ else if (pMethod == PartitionMethod.HASH) {
+
+ if (pKeys instanceof Keys.FieldPositionKeys) {
+
+ int[] logicalKeyPositions = pKeys.computeLogicalKeyPositions();
+ UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getType(), getType());
+ PartitionOperatorBase<T> noop = new PartitionOperatorBase<T>(operatorInfo, pMethod, logicalKeyPositions, name);
+ // set input
+ noop.setInput(input);
+ // set DOP
+ noop.setDegreeOfParallelism(getParallelism());
+
+ return noop;
+ } else if (pKeys instanceof Keys.SelectorFunctionKeys) {
+
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<T, ?> selectorKeys = (Keys.SelectorFunctionKeys<T, ?>) pKeys;
+ MapOperatorBase<?, T, ?> po = translateSelectorFunctionReducer(selectorKeys, pMethod, getType(), name, input, getParallelism());
+ return po;
+ }
+ else {
+ throw new UnsupportedOperationException("Unrecognized key type.");
+ }
+
+ }
+ else if (pMethod == PartitionMethod.RANGE) {
+ throw new UnsupportedOperationException("Range partitioning not yet supported");
+ }
+
+ return null;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static <T, K> MapOperatorBase<Tuple2<K, T>, T, ?> translateSelectorFunctionReducer(Keys.SelectorFunctionKeys<T, ?> rawKeys,
+ PartitionMethod pMethod, TypeInformation<T> inputType, String name, Operator<T> input, int partitionDop)
+ {
+ @SuppressWarnings("unchecked")
+ final Keys.SelectorFunctionKeys<T, K> keys = (Keys.SelectorFunctionKeys<T, K>) rawKeys;
+
+ TypeInformation<Tuple2<K, T>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, T>>(keys.getKeyType(), inputType);
+ UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>> operatorInfo = new UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>>(typeInfoWithKey, typeInfoWithKey);
+
+ KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper<T, K>(keys.getKeyExtractor());
+
+ MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType, typeInfoWithKey), "Key Extractor");
+ PartitionOperatorBase<Tuple2<K, T>> noop = new PartitionOperatorBase<Tuple2<K, T>>(operatorInfo, pMethod, new int[]{0}, name);
+ MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, T>(typeInfoWithKey, inputType), "Key Extractor");
+
+ keyExtractingMap.setInput(input);
+ noop.setInput(keyExtractingMap);
+ keyRemovingMap.setInput(noop);
+
+ // set dop
+ keyExtractingMap.setDegreeOfParallelism(input.getDegreeOfParallelism());
+ noop.setDegreeOfParallelism(partitionDop);
+ keyRemovingMap.setDegreeOfParallelism(partitionDop);
+
+ return keyRemovingMap;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
deleted file mode 100644
index a30be20..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class PartitionedDataSet<IN> {
-
- private final DataSet<IN> dataSet;
-
- private final Keys<IN> pKeys;
- private final PartitionMethod pMethod;
-
- public PartitionedDataSet(DataSet<IN> input, PartitionMethod pMethod, Keys<IN> pKeys) {
- this.dataSet = input;
-
- if(pMethod == PartitionMethod.HASH && pKeys == null) {
- throw new IllegalArgumentException("Hash Partitioning requires keys");
- } else if(pMethod == PartitionMethod.RANGE) {
- throw new UnsupportedOperationException("Range Partitioning not yet supported");
- }
-
- if(pKeys instanceof Keys.FieldPositionKeys<?> && !input.getType().isTupleType()) {
- throw new IllegalArgumentException("Hash Partitioning with key fields only possible on Tuple DataSets");
- }
-
- this.pMethod = pMethod;
- this.pKeys = pKeys;
- }
-
- public PartitionedDataSet(DataSet<IN> input, PartitionMethod pMethod) {
- this(input, pMethod, null);
- }
-
- public DataSet<IN> getDataSet() {
- return this.dataSet;
- }
-
-
- /**
- * Applies a Map transformation on a {@link DataSet}.<br/>
- * The transformation calls a {@link org.apache.flink.api.java.functions.RichMapFunction} for each element of the DataSet.
- * Each MapFunction call returns exactly one element.
- *
- * @param mapper The MapFunction that is called for each element of the DataSet.
- * @return A MapOperator that represents the transformed DataSet.
- *
- * @see org.apache.flink.api.java.functions.RichMapFunction
- * @see MapOperator
- * @see DataSet
- */
- public <R> MapOperator<IN, R> map(MapFunction<IN, R> mapper) {
- if (mapper == null) {
- throw new NullPointerException("Map function must not be null.");
- }
-
- final TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, dataSet.getType());
-
- return new MapOperator<IN, R>(this, resultType, mapper);
- }
-
- /**
- * Applies a Map-style operation to the entire partition of the data.
- * The function is called once per parallel partition of the data,
- * and the entire partition is available through the given Iterator.
- * The number of elements that each instance of the MapPartition function
- * sees is non deterministic and depends on the degree of parallelism of the operation.
- *
- * This function is intended for operations that cannot transform individual elements,
- * requires no grouping of elements. To transform individual elements,
- * the use of {@code map()} and {@code flatMap()} is preferable.
- *
- * @param mapPartition The MapPartitionFunction that is called for the full DataSet.
- * @return A MapPartitionOperator that represents the transformed DataSet.
- *
- * @see MapPartitionFunction
- * @see MapPartitionOperator
- * @see DataSet
- */
- public <R> MapPartitionOperator<IN, R> mapPartition(MapPartitionFunction<IN, R> mapPartition ){
- if (mapPartition == null) {
- throw new NullPointerException("MapPartition function must not be null.");
- }
-
- final TypeInformation<R> resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, dataSet.getType());
-
- return new MapPartitionOperator<IN, R>(this, resultType, mapPartition);
- }
-
- /**
- * Applies a FlatMap transformation on a {@link DataSet}.<br/>
- * The transformation calls a {@link org.apache.flink.api.java.functions.RichFlatMapFunction} for each element of the DataSet.
- * Each FlatMapFunction call can return any number of elements including none.
- *
- * @param flatMapper The FlatMapFunction that is called for each element of the DataSet.
- * @return A FlatMapOperator that represents the transformed DataSet.
- *
- * @see org.apache.flink.api.java.functions.RichFlatMapFunction
- * @see FlatMapOperator
- * @see DataSet
- */
- public <R> FlatMapOperator<IN, R> flatMap(FlatMapFunction<IN, R> flatMapper) {
- if (flatMapper == null) {
- throw new NullPointerException("FlatMap function must not be null.");
- }
-
- TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, dataSet.getType());
-
- return new FlatMapOperator<IN, R>(this, resultType, flatMapper);
- }
-
- /**
- * Applies a Filter transformation on a {@link DataSet}.<br/>
- * The transformation calls a {@link org.apache.flink.api.java.functions.RichFilterFunction} for each element of the DataSet
- * and retains only those element for which the function returns true. Elements for
- * which the function returns false are filtered.
- *
- * @param filter The FilterFunction that is called for each element of the DataSet.
- * @return A FilterOperator that represents the filtered DataSet.
- *
- * @see org.apache.flink.api.java.functions.RichFilterFunction
- * @see FilterOperator
- * @see DataSet
- */
- public FilterOperator<IN> filter(FilterFunction<IN> filter) {
- if (filter == null) {
- throw new NullPointerException("Filter function must not be null.");
- }
- return new FilterOperator<IN>(this, filter);
- }
-
-
- /*
- * Translation of partitioning
- */
-
- protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input, int partitionDop) {
-
- String name = "Partition";
-
- // distinguish between partition types
- if (pMethod == PartitionMethod.REBALANCE) {
-
- UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(dataSet.getType(), dataSet.getType());
- PartitionOperatorBase<IN> noop = new PartitionOperatorBase<IN>(operatorInfo, pMethod, name);
- // set input
- noop.setInput(input);
- // set DOP
- noop.setDegreeOfParallelism(partitionDop);
-
- return noop;
- }
- else if (pMethod == PartitionMethod.HASH) {
-
- if (pKeys instanceof Keys.FieldPositionKeys) {
-
- int[] logicalKeyPositions = pKeys.computeLogicalKeyPositions();
- UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(dataSet.getType(), dataSet.getType());
- PartitionOperatorBase<IN> noop = new PartitionOperatorBase<IN>(operatorInfo, pMethod, logicalKeyPositions, name);
- // set input
- noop.setInput(input);
- // set DOP
- noop.setDegreeOfParallelism(partitionDop);
-
- return noop;
- } else if (pKeys instanceof Keys.SelectorFunctionKeys) {
-
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<IN, ?> selectorKeys = (Keys.SelectorFunctionKeys<IN, ?>) pKeys;
- MapOperatorBase<?, IN, ?> po = translateSelectorFunctionReducer(selectorKeys, pMethod, dataSet.getType(), name, input, partitionDop);
- return po;
- }
- else {
- throw new UnsupportedOperationException("Unrecognized key type.");
- }
-
- }
- else if (pMethod == PartitionMethod.RANGE) {
- throw new UnsupportedOperationException("Range partitioning not yet supported");
- }
-
- return null;
- }
-
- // --------------------------------------------------------------------------------------------
-
- private static <T, K> MapOperatorBase<Tuple2<K, T>, T, ?> translateSelectorFunctionReducer(Keys.SelectorFunctionKeys<T, ?> rawKeys,
- PartitionMethod pMethod, TypeInformation<T> inputType, String name, Operator<T> input, int partitionDop)
- {
- @SuppressWarnings("unchecked")
- final Keys.SelectorFunctionKeys<T, K> keys = (Keys.SelectorFunctionKeys<T, K>) rawKeys;
-
- TypeInformation<Tuple2<K, T>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, T>>(keys.getKeyType(), inputType);
- UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>> operatorInfo = new UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>>(typeInfoWithKey, typeInfoWithKey);
-
- KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper<T, K>(keys.getKeyExtractor());
-
- MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType, typeInfoWithKey), "Key Extractor");
- PartitionOperatorBase<Tuple2<K, T>> noop = new PartitionOperatorBase<Tuple2<K, T>>(operatorInfo, pMethod, new int[]{0}, name);
- MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, T>(typeInfoWithKey, inputType), "Key Extractor");
-
- keyExtractingMap.setInput(input);
- noop.setInput(keyExtractingMap);
- keyRemovingMap.setInput(noop);
-
- // set dop
- keyExtractingMap.setDegreeOfParallelism(input.getDegreeOfParallelism());
- noop.setDegreeOfParallelism(partitionDop);
- keyRemovingMap.setDegreeOfParallelism(partitionDop);
-
- return keyRemovingMap;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
index 991f4f3..b44c450 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
@@ -208,7 +208,7 @@ public class PartitionITCase extends JavaProgramTestBase {
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Long> uniqLongs = ds
- .partitionByHash(1)
+ .partitionByHash(1).setParallelism(4)
.mapPartition(new UniqueLongMapper()).setParallelism(4);
uniqLongs.writeAsText(resultPath);