You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/29 18:49:11 UTC

[4/4] git commit: Change Partition Operator to actual general-purpose Operator

Change Partition Operator to actual general-purpose Operator

It is no longer a special-case operator that can only be used in front
of map-style operations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/01e74da3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/01e74da3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/01e74da3

Branch: refs/heads/master
Commit: 01e74da3e8b10dba0afb2606f663f8e37104b283
Parents: f0ed58c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Sep 26 12:52:08 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 29 18:16:36 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |  30 +--
 .../flink/api/java/operators/DataSource.java    |  46 +---
 .../api/java/operators/FilterOperator.java      |  14 +-
 .../api/java/operators/FlatMapOperator.java     |  12 -
 .../flink/api/java/operators/MapOperator.java   |  15 +-
 .../java/operators/MapPartitionOperator.java    |  12 -
 .../flink/api/java/operators/Operator.java      |   4 +-
 .../api/java/operators/PartitionOperator.java   | 146 +++++++++++
 .../api/java/operators/PartitionedDataSet.java  | 246 -------------------
 .../test/javaApiOperators/PartitionITCase.java  |   2 +-
 10 files changed, 167 insertions(+), 360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 76363bd..bd28cde 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.java;
 
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -58,7 +57,7 @@ import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
-import org.apache.flink.api.java.operators.PartitionedDataSet;
+import org.apache.flink.api.java.operators.PartitionOperator;
 import org.apache.flink.api.java.operators.ProjectOperator.Projection;
 import org.apache.flink.api.java.operators.ReduceOperator;
 import org.apache.flink.api.java.operators.SortedGrouping;
@@ -230,13 +229,13 @@ public abstract class DataSet<T> {
 	 * Initiates a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
 	 * <b>Note: Only Tuple DataSets can be projected.</b></br>
 	 * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
-	 * This method returns a {@link Projection} on which {@link Projection#types()} needs to
+	 * This method returns a {@link Projection} on which {@link Projection#types} needs to
 	 *   be called to completed the transformation.
 	 * 
 	 * @param fieldIndexes The field indexes of the input tuples that are retained.
 	 * 					   The order of fields in the output tuple corresponds to the order of field indexes.
 	 * @return A Projection that needs to be converted into a {@link org.apache.flink.api.java.operators.ProjectOperator} to complete the 
-	 *           Project transformation by calling {@link Projection#types()}.
+	 *           Project transformation by calling {@link Projection#types}.
 	 * 
 	 * @see Tuple
 	 * @see DataSet
@@ -542,7 +541,7 @@ public abstract class DataSet<T> {
 	 *   joining elements into one DataSet.</br>
 	 * 
 	 * This method returns a {@link JoinOperatorSets} on which 
-	 *   {@link JoinOperatorSets#where()} needs to be called to define the join key of the first 
+	 *   {@link JoinOperatorSets#where} needs to be called to define the join key of the first
 	 *   joining (i.e., this) DataSet.
 	 *  
 	 * @param other The other DataSet with which this DataSet is joined.
@@ -563,7 +562,7 @@ public abstract class DataSet<T> {
 	 * This method also gives the hint to the optimizer that the second DataSet to join is much
 	 *   smaller than the first one.</br>
 	 * This method returns a {@link JoinOperatorSets} on which 
-	 *   {@link JoinOperatorSets#where()} needs to be called to define the join key of the first 
+	 *   {@link JoinOperatorSets#where} needs to be called to define the join key of the first
 	 *   joining (i.e., this) DataSet.
 	 *  
 	 * @param other The other DataSet with which this DataSet is joined.
@@ -584,7 +583,7 @@ public abstract class DataSet<T> {
 	 * This method also gives the hint to the optimizer that the second DataSet to join is much
 	 *   larger than the first one.</br>
 	 * This method returns a {@link JoinOperatorSets JoinOperatorSet} on which 
-	 *   {@link JoinOperatorSets#where()} needs to be called to define the join key of the first 
+	 *   {@link JoinOperatorSets#where} needs to be called to define the join key of the first
 	 *   joining (i.e., this) DataSet.
 	 *  
 	 * @param other The other DataSet with which this DataSet is joined.
@@ -611,7 +610,7 @@ public abstract class DataSet<T> {
 	 * The CoGroupFunction can iterate over the elements of both groups and return any number 
 	 *   of elements including none.</br>
 	 * This method returns a {@link CoGroupOperatorSets} on which 
-	 *   {@link CoGroupOperatorSets#where()} needs to be called to define the grouping key of the first 
+	 *   {@link CoGroupOperatorSets#where} needs to be called to define the grouping key of the first
 	 *   (i.e., this) DataSet.
 	 * 
 	 * @param other The other DataSet of the CoGroup transformation.
@@ -654,7 +653,8 @@ public abstract class DataSet<T> {
 	 * second input being the second field of the tuple.
 	 * 
 	 * <p>
-	 * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a {@link CrossFunction} which is called for
+	 * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
+	 * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
 	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
 	 * 
 	 * @param other The other DataSet with which this DataSet is crossed. 
@@ -862,8 +862,8 @@ public abstract class DataSet<T> {
 	 * @param fields The field indexes on which the DataSet is hash-partitioned.
 	 * @return The partitioned DataSet.
 	 */
-	public PartitionedDataSet<T> partitionByHash(int... fields) {
-		return new PartitionedDataSet<T>(this, PartitionMethod.HASH, new Keys.FieldPositionKeys<T>(fields, getType(), false));
+	public PartitionOperator<T> partitionByHash(int... fields) {
+		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.FieldPositionKeys<T>(fields, getType(), false));
 	}
 	
 	/**
@@ -876,9 +876,9 @@ public abstract class DataSet<T> {
 	 * 
 	 * @see KeySelector
 	 */
-	public <K extends Comparable<K>> PartitionedDataSet<T> partitionByHash(KeySelector<T, K> keyExtractor) {
+	public <K extends Comparable<K>> PartitionOperator<T> partitionByHash(KeySelector<T, K> keyExtractor) {
 		final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
-		return new PartitionedDataSet<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, this.getType(), keyType));
+		return new PartitionOperator<T>(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, this.getType(), keyType));
 	}
 	
 	/**
@@ -889,8 +889,8 @@ public abstract class DataSet<T> {
 	 * 
 	 * @return The rebalanced DataSet.
 	 */
-	public PartitionedDataSet<T> rebalance() {
-		return new PartitionedDataSet<T>(this, PartitionMethod.REBALANCE);
+	public PartitionOperator<T> rebalance() {
+		return new PartitionOperator<T>(this, PartitionMethod.REBALANCE);
 	}
 		
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
index 45bab86..9e3c4a2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
 /**
@@ -32,13 +31,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
  * 
  * @param <OUT> The type of the elements produced by this data source.
  */
-public class DataSource<OUT> extends DataSet<OUT> {
+public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {
 	
 	private final InputFormat<OUT, ?> inputFormat;
-	
-	private String name;
-	
-	private int dop = -1;
 
 	// --------------------------------------------------------------------------------------------
 	
@@ -70,45 +65,6 @@ public class DataSource<OUT> extends DataSet<OUT> {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	/**
-	 * Sets the name of the data source operation. The name will be used for logging and other
-	 * messages. The default name is a textual representation of the input format.
-	 * 
-	 * @param name The name for the data source.
-	 * @return The data source object itself, to allow for function call chaining.
-	 */
-	public DataSource<OUT> name(String name) {
-		this.name = name;
-		return this;
-	}
-	
-	/**
-	 * Returns the degree of parallelism of this data source.
-	 * 
-	 * @return The degree of parallelism of this data source.
-	 */
-	public int getParallelism() {
-		return this.dop;
-	}
-	
-	/**
-	 * Sets the degree of parallelism for this data source.
-	 * The degree must be 1 or more.
-	 * 
-	 * @param dop The degree of parallelism for this data source.
-	 * @return This data source with set degree of parallelism.
-	 */
-	public DataSource<OUT> setParallelism(int dop) {
-		if(dop < 1) {
-			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
-		}
-		this.dop = dop;
-		
-		return this;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
 	protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
 		String name = this.name != null ? this.name : this.inputFormat.toString();
 		if (name.length() > 100) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
index 727820a..ab8a6c5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
@@ -33,8 +33,6 @@ import org.apache.flink.api.java.DataSet;
 public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperator<T>> {
 	
 	protected final FilterFunction<T> function;
-	
-	protected PartitionedDataSet<T> partitionedDataSet;
 
 	public FilterOperator(DataSet<T> input, FilterFunction<T> function) {
 		super(input, input.getType());
@@ -43,19 +41,9 @@ public class FilterOperator<T> extends SingleInputUdfOperator<T, T, FilterOperat
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 	
-	public FilterOperator(PartitionedDataSet<T> input, FilterFunction<T> function) {
-		this(input.getDataSet(), function);
-		this.partitionedDataSet = input;
-	}
-	
 	@Override
 	protected org.apache.flink.api.common.operators.base.FilterOperatorBase<T, FlatMapFunction<T,T>> translateToDataFlow(Operator<T> input) {
-		
-		// inject partition operator if necessary
-		if(this.partitionedDataSet != null) {
-			input = this.partitionedDataSet.translateToDataFlow(input, this.getParallelism());
-		}
-		
+
 		String name = getName() != null ? getName() : function.getClass().getName();
 		// create operator
 		PlanFilterOperator<T> po = new PlanFilterOperator<T>(function, name, getInputType());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index 0dc401e..b7e336f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -36,8 +36,6 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
 	
 	protected final FlatMapFunction<IN, OUT> function;
 	
-	protected PartitionedDataSet<IN> partitionedDataSet;
-	
 	public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function) {
 		super(input, resultType);
 		
@@ -45,19 +43,9 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 	
-	public FlatMapOperator(PartitionedDataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function) {
-		this(input.getDataSet(), resultType, function);
-		this.partitionedDataSet = input;
-	}
-
 	@Override
 	protected org.apache.flink.api.common.operators.base.FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
 		
-		// inject partition operator if necessary
-		if(this.partitionedDataSet != null) {
-			input = this.partitionedDataSet.translateToDataFlow(input, this.getParallelism());
-		}
-		
 		String name = getName() != null ? getName() : function.getClass().getName();
 		// create operator
 		FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index e45cc96..f1ece2c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -37,10 +37,7 @@ import org.apache.flink.api.java.DataSet;
 public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOperator<IN, OUT>> {
 	
 	protected final MapFunction<IN, OUT> function;
-	
-	protected PartitionedDataSet<IN> partitionedDataSet;
-	
-	
+
 	public MapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapFunction<IN, OUT> function) {
 
 		super(input, resultType);
@@ -49,19 +46,9 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 	
-	public MapOperator(PartitionedDataSet<IN> input, TypeInformation<OUT> resultType, MapFunction<IN, OUT> function) {
-		this(input.getDataSet(), resultType, function);
-		this.partitionedDataSet = input;
-	}
-
 	@Override
 	protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
 		
-		// inject partition operator if necessary
-		if(this.partitionedDataSet != null) {
-			input = this.partitionedDataSet.translateToDataFlow(input, this.getParallelism());
-		}
-		
 		String name = getName() != null ? getName() : function.getClass().getName();
 		// create operator
 		MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
index 9c896f9..839298b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
@@ -38,8 +38,6 @@ public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 	
 	protected final MapPartitionFunction<IN, OUT> function;
 	
-	protected PartitionedDataSet<IN> partitionedDataSet;
-	
 	public MapPartitionOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapPartitionFunction<IN, OUT> function) {
 		super(input, resultType);
 		
@@ -47,19 +45,9 @@ public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 		extractSemanticAnnotationsFromUdf(function.getClass());
 	}
 	
-	public MapPartitionOperator(PartitionedDataSet<IN> input, TypeInformation<OUT> resultType, MapPartitionFunction<IN, OUT> function) {
-		this(input.getDataSet(), resultType, function);
-		this.partitionedDataSet = input;
-	}
-
 	@Override
 	protected MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
 		
-		// inject partition operator if necessary
-		if(this.partitionedDataSet != null) {
-			input = this.partitionedDataSet.translateToDataFlow(input, this.getParallelism());
-		}
-		
 		String name = getName() != null ? getName() : function.getClass().getName();
 		// create operator
 		MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
index 53ff546..0f8a3eb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
@@ -30,9 +30,9 @@ import org.apache.flink.api.java.ExecutionEnvironment;
  */
 public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<OUT> {
 
-	private String name;
+	protected String name;
 	
-	private int dop = -1;
+	protected int dop = -1;
 
 	protected Operator(ExecutionEnvironment context, TypeInformation<OUT> resultType) {
 		super(context, resultType);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
new file mode 100644
index 0000000..f0931b5
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
+import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
+import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+/**
+ * This operator represents a partitioning.
+ *
+ * @param <T> The type of the data being partitioned.
+ */
+public class PartitionOperator<T> extends SingleInputUdfOperator<T, T, PartitionOperator<T>> {
+	
+	private final Keys<T> pKeys;
+	private final PartitionMethod pMethod;
+	
+	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys) {
+		super(input, input.getType());
+
+		if(pMethod == PartitionMethod.HASH && pKeys == null) {
+			throw new IllegalArgumentException("Hash Partitioning requires keys");
+		} else if(pMethod == PartitionMethod.RANGE) {
+			throw new UnsupportedOperationException("Range Partitioning not yet supported");
+		}
+		
+		if(pKeys instanceof Keys.FieldPositionKeys<?> && !input.getType().isTupleType()) {
+			throw new IllegalArgumentException("Hash Partitioning with key fields only possible on Tuple DataSets");
+		}
+		
+		this.pMethod = pMethod;
+		this.pKeys = pKeys;
+	}
+	
+	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod) {
+		this(input, pMethod, null);
+	}
+	
+	/*
+	 * Translation of partitioning
+	 */
+	protected org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> translateToDataFlow(Operator<T> input) {
+	
+		String name = "Partition";
+		
+		// distinguish between partition types
+		if (pMethod == PartitionMethod.REBALANCE) {
+			
+			UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getType(), getType());
+			PartitionOperatorBase<T> noop = new PartitionOperatorBase<T>(operatorInfo, pMethod, name);
+			// set input
+			noop.setInput(input);
+			// set DOP
+			noop.setDegreeOfParallelism(getParallelism());
+			
+			return noop;
+		} 
+		else if (pMethod == PartitionMethod.HASH) {
+			
+			if (pKeys instanceof Keys.FieldPositionKeys) {
+				
+				int[] logicalKeyPositions = pKeys.computeLogicalKeyPositions();
+				UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getType(), getType());
+				PartitionOperatorBase<T> noop = new PartitionOperatorBase<T>(operatorInfo, pMethod, logicalKeyPositions, name);
+				// set input
+				noop.setInput(input);
+				// set DOP
+				noop.setDegreeOfParallelism(getParallelism());
+				
+				return noop;
+			} else if (pKeys instanceof Keys.SelectorFunctionKeys) {
+				
+				@SuppressWarnings("unchecked")
+				Keys.SelectorFunctionKeys<T, ?> selectorKeys = (Keys.SelectorFunctionKeys<T, ?>) pKeys;
+				MapOperatorBase<?, T, ?> po = translateSelectorFunctionReducer(selectorKeys, pMethod, getType(), name, input, getParallelism());
+				return po;
+			}
+			else {
+				throw new UnsupportedOperationException("Unrecognized key type.");
+			}
+			
+		} 
+		else if (pMethod == PartitionMethod.RANGE) {
+			throw new UnsupportedOperationException("Range partitioning not yet supported");
+		}
+		
+		return null;
+	}
+		
+	// --------------------------------------------------------------------------------------------
+	
+	private static <T, K> MapOperatorBase<Tuple2<K, T>, T, ?> translateSelectorFunctionReducer(Keys.SelectorFunctionKeys<T, ?> rawKeys,
+			PartitionMethod pMethod, TypeInformation<T> inputType, String name, Operator<T> input, int partitionDop)
+	{
+		@SuppressWarnings("unchecked")
+		final Keys.SelectorFunctionKeys<T, K> keys = (Keys.SelectorFunctionKeys<T, K>) rawKeys;
+		
+		TypeInformation<Tuple2<K, T>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, T>>(keys.getKeyType(), inputType);
+		UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>> operatorInfo = new UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>>(typeInfoWithKey, typeInfoWithKey);
+		
+		KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper<T, K>(keys.getKeyExtractor());
+		
+		MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType, typeInfoWithKey), "Key Extractor");
+		PartitionOperatorBase<Tuple2<K, T>> noop = new PartitionOperatorBase<Tuple2<K, T>>(operatorInfo, pMethod, new int[]{0}, name);
+		MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, T>(typeInfoWithKey, inputType), "Key Extractor");
+
+		keyExtractingMap.setInput(input);
+		noop.setInput(keyExtractingMap);
+		keyRemovingMap.setInput(noop);
+		
+		// set dop
+		keyExtractingMap.setDegreeOfParallelism(input.getDegreeOfParallelism());
+		noop.setDegreeOfParallelism(partitionDop);
+		keyRemovingMap.setDegreeOfParallelism(partitionDop);
+		
+		return keyRemovingMap;
+	}
+
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
deleted file mode 100644
index a30be20..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class PartitionedDataSet<IN> {
-	
-	private final DataSet<IN> dataSet;
-	
-	private final Keys<IN> pKeys;
-	private final PartitionMethod pMethod;
-	
-	public PartitionedDataSet(DataSet<IN> input, PartitionMethod pMethod, Keys<IN> pKeys) {
-		this.dataSet = input;
-		
-		if(pMethod == PartitionMethod.HASH && pKeys == null) {
-			throw new IllegalArgumentException("Hash Partitioning requires keys");
-		} else if(pMethod == PartitionMethod.RANGE) {
-			throw new UnsupportedOperationException("Range Partitioning not yet supported");
-		}
-		
-		if(pKeys instanceof Keys.FieldPositionKeys<?> && !input.getType().isTupleType()) {
-			throw new IllegalArgumentException("Hash Partitioning with key fields only possible on Tuple DataSets");
-		}
-		
-		this.pMethod = pMethod;
-		this.pKeys = pKeys;
-	}
-	
-	public PartitionedDataSet(DataSet<IN> input, PartitionMethod pMethod) {
-		this(input, pMethod, null);
-	}
-	
-	public DataSet<IN> getDataSet() {
-		return this.dataSet;
-	}
-	
-	
-	/**
-	 * Applies a Map transformation on a {@link DataSet}.<br/>
-	 * The transformation calls a {@link org.apache.flink.api.java.functions.RichMapFunction} for each element of the DataSet.
-	 * Each MapFunction call returns exactly one element.
-	 * 
-	 * @param mapper The MapFunction that is called for each element of the DataSet.
-	 * @return A MapOperator that represents the transformed DataSet.
-	 * 
-	 * @see org.apache.flink.api.java.functions.RichMapFunction
-	 * @see MapOperator
-	 * @see DataSet
-	 */
-	public <R> MapOperator<IN, R> map(MapFunction<IN, R> mapper) {
-		if (mapper == null) {
-			throw new NullPointerException("Map function must not be null.");
-		}
-		
-		final TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, dataSet.getType());
-		
-		return new MapOperator<IN, R>(this, resultType, mapper);
-	}
-
-	/**
-	 * Applies a Map-style operation to the entire partition of the data.
-	 * The function is called once per parallel partition of the data,
-	 * and the entire partition is available through the given Iterator.
-	 * The number of elements that each instance of the MapPartition function
-	 * sees is non deterministic and depends on the degree of parallelism of the operation.
-	 *
-	 * This function is intended for operations that cannot transform individual elements,
-	 * requires no grouping of elements. To transform individual elements,
-	 * the use of {@code map()} and {@code flatMap()} is preferable.
-	 *
-	 * @param mapPartition The MapPartitionFunction that is called for the full DataSet.
-	 * @return A MapPartitionOperator that represents the transformed DataSet.
-	 *
-	 * @see MapPartitionFunction
-	 * @see MapPartitionOperator
-	 * @see DataSet
-	 */
-	public <R> MapPartitionOperator<IN, R> mapPartition(MapPartitionFunction<IN, R> mapPartition ){
-		if (mapPartition == null) {
-			throw new NullPointerException("MapPartition function must not be null.");
-		}
-		
-		final TypeInformation<R> resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, dataSet.getType());
-		
-		return new MapPartitionOperator<IN, R>(this, resultType, mapPartition);
-	}
-	
-	/**
-	 * Applies a FlatMap transformation on a {@link DataSet}.<br/>
-	 * The transformation calls a {@link org.apache.flink.api.java.functions.RichFlatMapFunction} for each element of the DataSet.
-	 * Each FlatMapFunction call can return any number of elements including none.
-	 * 
-	 * @param flatMapper The FlatMapFunction that is called for each element of the DataSet. 
-	 * @return A FlatMapOperator that represents the transformed DataSet.
-	 * 
-	 * @see org.apache.flink.api.java.functions.RichFlatMapFunction
-	 * @see FlatMapOperator
-	 * @see DataSet
-	 */
-	public <R> FlatMapOperator<IN, R> flatMap(FlatMapFunction<IN, R> flatMapper) {
-		if (flatMapper == null) {
-			throw new NullPointerException("FlatMap function must not be null.");
-		}
-		
-		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, dataSet.getType());
-		
-		return new FlatMapOperator<IN, R>(this, resultType, flatMapper);
-	}
-	
-	/**
-	 * Applies a Filter transformation on a {@link DataSet}.<br/>
-	 * The transformation calls a {@link org.apache.flink.api.java.functions.RichFilterFunction} for each element of the DataSet
-	 * and retains only those element for which the function returns true. Elements for 
-	 * which the function returns false are filtered. 
-	 * 
-	 * @param filter The FilterFunction that is called for each element of the DataSet.
-	 * @return A FilterOperator that represents the filtered DataSet.
-	 * 
-	 * @see org.apache.flink.api.java.functions.RichFilterFunction
-	 * @see FilterOperator
-	 * @see DataSet
-	 */
-	public FilterOperator<IN> filter(FilterFunction<IN> filter) {
-		if (filter == null) {
-			throw new NullPointerException("Filter function must not be null.");
-		}
-		return new FilterOperator<IN>(this, filter);
-	}
-	
-	
-	/*
-	 * Translation of partitioning
-	 */
-		
-	protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input, int partitionDop) {
-	
-		String name = "Partition";
-		
-		// distinguish between partition types
-		if (pMethod == PartitionMethod.REBALANCE) {
-			
-			UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(dataSet.getType(), dataSet.getType());
-			PartitionOperatorBase<IN> noop = new PartitionOperatorBase<IN>(operatorInfo, pMethod, name);
-			// set input
-			noop.setInput(input);
-			// set DOP
-			noop.setDegreeOfParallelism(partitionDop);
-			
-			return noop;
-		} 
-		else if (pMethod == PartitionMethod.HASH) {
-			
-			if (pKeys instanceof Keys.FieldPositionKeys) {
-				
-				int[] logicalKeyPositions = pKeys.computeLogicalKeyPositions();
-				UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(dataSet.getType(), dataSet.getType());
-				PartitionOperatorBase<IN> noop = new PartitionOperatorBase<IN>(operatorInfo, pMethod, logicalKeyPositions, name);
-				// set input
-				noop.setInput(input);
-				// set DOP
-				noop.setDegreeOfParallelism(partitionDop);
-				
-				return noop;
-			} else if (pKeys instanceof Keys.SelectorFunctionKeys) {
-				
-				@SuppressWarnings("unchecked")
-				Keys.SelectorFunctionKeys<IN, ?> selectorKeys = (Keys.SelectorFunctionKeys<IN, ?>) pKeys;
-				MapOperatorBase<?, IN, ?> po = translateSelectorFunctionReducer(selectorKeys, pMethod, dataSet.getType(), name, input, partitionDop);
-				return po;
-			}
-			else {
-				throw new UnsupportedOperationException("Unrecognized key type.");
-			}
-			
-		} 
-		else if (pMethod == PartitionMethod.RANGE) {
-			throw new UnsupportedOperationException("Range partitioning not yet supported");
-		}
-		
-		return null;
-	}
-		
-	// --------------------------------------------------------------------------------------------
-	
-	private static <T, K> MapOperatorBase<Tuple2<K, T>, T, ?> translateSelectorFunctionReducer(Keys.SelectorFunctionKeys<T, ?> rawKeys,
-			PartitionMethod pMethod, TypeInformation<T> inputType, String name, Operator<T> input, int partitionDop)
-	{
-		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<T, K> keys = (Keys.SelectorFunctionKeys<T, K>) rawKeys;
-		
-		TypeInformation<Tuple2<K, T>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, T>>(keys.getKeyType(), inputType);
-		UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>> operatorInfo = new UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>>(typeInfoWithKey, typeInfoWithKey);
-		
-		KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper<T, K>(keys.getKeyExtractor());
-		
-		MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType, typeInfoWithKey), "Key Extractor");
-		PartitionOperatorBase<Tuple2<K, T>> noop = new PartitionOperatorBase<Tuple2<K, T>>(operatorInfo, pMethod, new int[]{0}, name);
-		MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, T>(typeInfoWithKey, inputType), "Key Extractor");
-
-		keyExtractingMap.setInput(input);
-		noop.setInput(keyExtractingMap);
-		keyRemovingMap.setInput(noop);
-		
-		// set dop
-		keyExtractingMap.setDegreeOfParallelism(input.getDegreeOfParallelism());
-		noop.setDegreeOfParallelism(partitionDop);
-		keyRemovingMap.setDegreeOfParallelism(partitionDop);
-		
-		return keyRemovingMap;
-	}
-
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/01e74da3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
index 991f4f3..b44c450 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
@@ -208,7 +208,7 @@ public class PartitionITCase extends JavaProgramTestBase {
 				
 				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 				DataSet<Long> uniqLongs = ds
-						.partitionByHash(1)
+						.partitionByHash(1).setParallelism(4)
 						.mapPartition(new UniqueLongMapper()).setParallelism(4);
 				uniqLongs.writeAsText(resultPath);