You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/15 13:08:43 UTC

[1/3] git commit: Small bug fixes for running hadoop output formats

Repository: incubator-flink
Updated Branches:
  refs/heads/master a87559aee -> 653216ba8


Small bug fixes for running hadoop output formats

This closes #75


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/653216ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/653216ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/653216ba

Branch: refs/heads/master
Commit: 653216ba83910ead811cfddf525b0448325882a3
Parents: d0cead7
Author: twalthr <in...@twalthr.com>
Authored: Mon Jul 21 12:11:16 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 14 22:43:26 2014 +0200

----------------------------------------------------------------------
 .../mapred/HadoopOutputFormat.java                  | 10 ++++++----
 .../mapreduce/HadoopOutputFormat.java               | 16 ++++++++++------
 2 files changed, 16 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/653216ba/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
index deae026..69a0b50 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
@@ -100,16 +100,18 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
 				+ Integer.toString(taskNumber + 1) 
 				+ "_0");
 		
+		this.jobConf.set("mapred.task.id", taskAttemptID.toString());
+		this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
+		// for hadoop 2.2
+		this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+		this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
+		
 		try {
 			this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
 		} catch (Exception e) {
 			throw new RuntimeException(e);
 		}
 		
-		this.jobConf.set("mapred.task.id", taskAttemptID.toString());
-		// for hadoop 2.2
-		this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
-		
 		this.fileOutputCommitter = new FileOutputCommitter();
 		
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/653216ba/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
index 9eabc03..f071eda 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
@@ -103,14 +103,17 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
 				+ Integer.toString(taskNumber + 1) 
 				+ "_0");
 		
+		this.configuration.set("mapred.task.id", taskAttemptID.toString());
+		this.configuration.setInt("mapred.task.partition", taskNumber + 1);
+		// for hadoop 2.2
+		this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+		this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
+		
 		try {
 			this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
 		} catch (Exception e) {
 			throw new RuntimeException(e);
 		}
-		this.configuration.set("mapred.task.id", taskAttemptID.toString());
-		// for hadoop 2.2
-		this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
 		
 		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
 		
@@ -157,11 +160,12 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
 		}
 		this.fileOutputCommitter.commitJob(this.context);
 		
-		// rename tmp-* files to final name
-		FileSystem fs = FileSystem.get(this.configuration);
 		
 		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
-
+		
+		// rename tmp-* files to final name
+		FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
+		
 		final Pattern p = Pattern.compile("tmp-(.)-([0-9]+)");
 		
 		// isDirectory does not work in hadoop 1


[3/3] git commit: [FLINK-1053] Add "mapPartition" operator.

Posted by se...@apache.org.
[FLINK-1053] Add "mapPartition" operator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d4de9774
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d4de9774
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d4de9774

Branch: refs/heads/master
Commit: d4de9774b3237bb1850024b1208640bc50f7adab
Parents: a87559a
Author: kay <fl...@googlemail.com>
Authored: Wed Jun 25 11:57:45 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 14 22:43:26 2014 +0200

----------------------------------------------------------------------
 .../org/apache/flink/compiler/PactCompiler.java |   4 +-
 .../flink/compiler/costs/CostEstimator.java     |   1 +
 .../java/org/apache/flink/api/java/DataSet.java |  27 +++
 .../flink/runtime/operators/DriverStrategy.java |   4 +
 .../compiler/dag/MapPartitionNode.java          |  56 ++++++
 .../operators/MapPartitionDescriptor.java       |  60 ++++++
 .../common/functions/GenericMapPartition.java   |  16 ++
 .../base/MapPartitionOperatorBase.java          |  43 +++++
 .../java/functions/MapPartitionFunction.java    |  36 ++++
 .../java/operators/MapPartitionOperator.java    |  67 +++++++
 .../record/functions/MapPartitionFunction.java  |  44 +++++
 .../record/operators/MapPartitionOperator.java  | 190 +++++++++++++++++++
 .../pact/runtime/task/MapPartitionDriver.java   |  92 +++++++++
 .../test/operators/MapPartitionITCase.java      | 130 +++++++++++++
 14 files changed, 769 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index e22a365..f16a0a2 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler;
 
 import java.util.ArrayDeque;
@@ -671,6 +670,9 @@ public class PactCompiler {
 			else if (c instanceof MapOperatorBase) {
 				n = new MapNode((MapOperatorBase<?, ?, ?>) c);
 			}
+			else if (c instanceof MapPartitionOperatorBase) {
+				n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
+			}
 			else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
 				n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
index b09f82f..cf11424 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
@@ -168,6 +168,7 @@ public abstract class CostEstimator {
 		case BINARY_NO_OP:	
 		case COLLECTOR_MAP:
 		case MAP:
+		case MAP_PARTITION:
 		case FLAT_MAP:
 			
 		case ALL_GROUP_REDUCE:

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 5da572d..2b15007 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -140,6 +140,33 @@ public abstract class DataSet<T> {
 		}
 		return new MapOperator<T, R>(this, mapper);
 	}
+
+
+
+    /**
+     * Applies a Map operation to the entire partition of the data.
+	 * The function is called once per parallel partition of the data,
+	 * and the entire partition is available through the given Iterator.
+	 * The number of elements that each instance of the MapPartition function
+	 * sees is non deterministic and depends on the degree of parallelism of the operation.
+	 *
+	 * This function is intended for operations that cannot transform individual elements,
+	 * requires no grouping of elements. To transform individual elements,
+	 * the use of {@code map()} and {@code flatMap()} is preferable.
+	 *
+	 * @param mapPartition The MapPartitionFunction that is called for the full DataSet.
+     * @return A MapPartitionOperator that represents the transformed DataSet.
+     *
+     * @see MapPartitionFunction
+     * @see MapPartitionOperator
+     * @see DataSet
+     */
+	public <R> MapPartitionOperator<T, R> mapPartition(MapPartitionFunction<T, R> mapPartition ){
+		if (mapPartition == null) {
+			throw new NullPointerException("MapPartition function must not be null.");
+		}
+		return new MapPartitionOperator<T, R>(this, mapPartition);
+	}
 	
 	/**
 	 * Applies a FlatMap transformation on a {@link DataSet}.<br/>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 5f00277..bb14539 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -44,6 +44,10 @@ public enum DriverStrategy {
 	COLLECTOR_MAP(CollectorMapDriver.class, ChainedCollectorMapDriver.class, PIPELINED, false),
 	// the proper mapper
 	MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, false),
+
+	// the proper map partition
+	MAP_PARTITION(MapPartitionDriver.class, null, PIPELINED, false),
+
 	// the flat mapper
 	FLAT_MAP(FlatMapDriver.class, ChainedFlatMapDriver.class, PIPELINED, false),
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
new file mode 100644
index 0000000..0d5e4ad
--- /dev/null
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
@@ -0,0 +1,56 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.compiler.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import eu.stratosphere.api.common.operators.SingleInputOperator;
+import eu.stratosphere.compiler.DataStatistics;
+import eu.stratosphere.compiler.operators.MapPartitionDescriptor;
+import eu.stratosphere.compiler.operators.OperatorDescriptorSingle;
+
+/**
+ * The optimizer's internal representation of a <i>MapPartition</i> operator node.
+ */
+public class MapPartitionNode extends SingleInputNode {
+	
+	/**
+	 * Creates a new MapNode for the given contract.
+	 * 
+	 * @param operator The map partition contract object.
+	 */
+	public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
+		super(operator);
+	}
+
+	@Override
+	public String getName() {
+		return "MapPartition";
+	}
+
+	@Override
+	protected List<OperatorDescriptorSingle> getPossibleProperties() {
+		return Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
+	}
+
+	/**
+	 * Computes the estimates for the MapPartition operator.
+	 * We assume that by default, Map takes one value and transforms it into another value.
+	 * The cardinality consequently stays the same.
+	 */
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
new file mode 100644
index 0000000..41b707d
--- /dev/null
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
@@ -0,0 +1,60 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.compiler.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import eu.stratosphere.compiler.dag.SingleInputNode;
+import eu.stratosphere.compiler.dataproperties.GlobalProperties;
+import eu.stratosphere.compiler.dataproperties.LocalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedGlobalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.SingleInputPlanNode;
+import eu.stratosphere.pact.runtime.task.DriverStrategy;
+
+
+public class MapPartitionDescriptor extends OperatorDescriptorSingle {
+
+	@Override
+	public DriverStrategy getStrategy() {
+		return DriverStrategy.MAP_PARTITION;
+	}
+
+	@Override
+	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+		return new SingleInputPlanNode(node, "MapPartition ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP_PARTITION);
+	}
+
+	@Override
+	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+		return Collections.singletonList(new RequestedGlobalProperties());
+	}
+
+	@Override
+	protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+		return Collections.singletonList(new RequestedLocalProperties());
+	}
+	
+	@Override
+	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+		return gProps;
+	}
+	
+	@Override
+	public LocalProperties computeLocalProperties(LocalProperties lProps) {
+		return lProps;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
new file mode 100644
index 0000000..accf731
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
@@ -0,0 +1,16 @@
+package eu.stratosphere.api.common.functions;
+
+import eu.stratosphere.util.Collector;
+
+import java.util.Iterator;
+
+public interface GenericMapPartition<T, O> extends Function {
+	/**
+	 * A user-implemented function that modifies or transforms an incoming object.
+	 *
+	 * @param records All records for the mapper
+	 * @param out The collector to hand results to.
+	 * @throws Exception
+	 */
+	void mapPartition(Iterator<T> records, Collector<O> out) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
new file mode 100644
index 0000000..9e3552a
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
@@ -0,0 +1,43 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.common.operators.base;
+
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.api.common.operators.SingleInputOperator;
+import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
+import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
+
+
+/**
+ *
+ * @param <IN> The input type.
+ * @param <OUT> The result type.
+ * @param <FT> The type of the user-defined function.
+ */
+public class MapPartitionOperatorBase<IN, OUT, FT extends GenericMapPartition<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+	
+	public MapPartitionOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+		super(udf, operatorInfo, name);
+	}
+	
+	public MapPartitionOperatorBase(FT udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+		super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, name);
+	}
+	
+	public MapPartitionOperatorBase(Class<? extends FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
new file mode 100644
index 0000000..4c0155f
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
@@ -0,0 +1,36 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.functions;
+
+
+import eu.stratosphere.api.common.functions.AbstractFunction;
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.util.Collector;
+
+import java.util.Iterator;
+
+public abstract class MapPartitionFunction<IN, OUT> extends AbstractFunction implements GenericMapPartition<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+	/**
+	 *
+	 * @param records All records for the mapper
+	 * @param out The collector to hand results to.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	@Override
+	public abstract void mapPartition(Iterator<IN> records, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
new file mode 100644
index 0000000..836b205
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.api.java.operators;
+
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.api.common.operators.Operator;
+import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
+import eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.functions.MapPartitionFunction;
+import eu.stratosphere.api.java.typeutils.TypeExtractor;
+
+/**
+ * This operator represents the application of a "mapPartition" function on a data set, and the
+ * result data set produced by the function.
+ * 
+ * @param <IN> The type of the data set consumed by the operator.
+ * @param <OUT> The type of the data set created by the operator.
+ * 
+ * @see MapFunction
+ */
+public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapPartitionOperator<IN, OUT>> {
+	
+	protected final MapPartitionFunction<IN, OUT> function;
+	
+	
+	public MapPartitionOperator(DataSet<IN> input, MapPartitionFunction<IN, OUT> function) {
+		super(input, TypeExtractor.getMapPartitionReturnTypes(function, input.getType()));
+		
+		this.function = function;
+		extractSemanticAnnotationsFromUdf(function.getClass());
+	}
+
+	@Override
+	protected eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>> translateToDataFlow(Operator<IN> input) {
+		
+		String name = getName() != null ? getName() : function.getClass().getName();
+		// create operator
+		MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
+		// set input
+		po.setInput(input);
+		// set dop
+		if(this.getParallelism() > 0) {
+			// use specified dop
+			po.setDegreeOfParallelism(this.getParallelism());
+		} else {
+			// if no dop has been specified, use dop of input operator to enable chaining
+			po.setDegreeOfParallelism(input.getDegreeOfParallelism());
+		}
+		
+		return po;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
new file mode 100644
index 0000000..5284d06
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
@@ -0,0 +1,44 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.record.functions;
+
+import eu.stratosphere.api.common.functions.AbstractFunction;
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.types.Record;
+import eu.stratosphere.util.Collector;
+
+import java.util.Iterator;
+
+/**
+ * The MapPartitionFunction must be extended to provide a map partition implementation
+ * By definition, the map partition is called for a full input set.
+ */
+public abstract class MapPartitionFunction extends AbstractFunction implements GenericMapPartition<Record, Record> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * This method must be implemented to provide a user implementation of a mappartitioner.
+	 * It is called for a full input set.
+	 *
+	 * @param records all input records
+	 * @param out A collector that collects all output records.
+	 *
+	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
+	 *                   runtime catches an exception, it aborts the map task and lets the fail-over logic
+	 *                   decide whether to retry the mapper execution.
+	 */
+	@Override
+	public abstract void mapPartition(Iterator<Record> records, Collector<Record> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
new file mode 100644
index 0000000..1ac132c
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
@@ -0,0 +1,190 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.record.operators;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase;
+import eu.stratosphere.api.java.record.functions.MapPartitionFunction;
+import org.apache.commons.lang3.Validate;
+
+
+
+import eu.stratosphere.api.common.operators.Operator;
+import eu.stratosphere.api.common.operators.RecordOperator;
+import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
+import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
+import eu.stratosphere.types.Key;
+import eu.stratosphere.types.Record;
+
+/**
+ * MapPartitionOperator that applies a {@link MapPartitionFunction} to each record independently.
+ * 
+ * @see MapPartitionFunction
+ */
+public class MapPartitionOperator extends MapPartitionOperatorBase<Record, Record, MapPartitionFunction> implements RecordOperator {
+	
+	private static String DEFAULT_NAME = "<Unnamed MapPartition>";
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+	 * 
+	 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+	 */
+	public static Builder builder(MapPartitionFunction udf) {
+		return new Builder(new UserCodeObjectWrapper<MapPartitionFunction>(udf));
+	}
+	
+	/**
+	 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+	 * 
+	 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+	 */
+	public static Builder builder(Class<? extends MapPartitionFunction> udf) {
+		return new Builder(new UserCodeClassWrapper<MapPartitionFunction>(udf));
+	}
+	
+	/**
+	 * The private constructor that only gets invoked from the Builder.
+	 * @param builder
+	 */
+	protected MapPartitionOperator(Builder builder) {
+
+		super(builder.udf, OperatorInfoHelper.unary(), builder.name);
+		
+		if (builder.inputs != null && !builder.inputs.isEmpty()) {
+			setInput(Operator.createUnionCascade(builder.inputs));
+		}
+		
+		setBroadcastVariables(builder.broadcastInputs);
+		setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf));
+	}
+	
+
+	@Override
+	public Class<? extends Key<?>>[] getKeyClasses() {
+		return emptyClassArray();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
+	 */
+	public static class Builder {
+		
+		/* The required parameters */
+		private final UserCodeWrapper<MapPartitionFunction> udf;
+		
+		/* The optional parameters */
+		private List<Operator<Record>> inputs;
+		private Map<String, Operator<Record>> broadcastInputs;
+		private String name = DEFAULT_NAME;
+		
+		/**
+		 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+		 * 
+		 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+		 */
+		private Builder(UserCodeWrapper<MapPartitionFunction> udf) {
+			this.udf = udf;
+			this.inputs = new ArrayList<Operator<Record>>();
+			this.broadcastInputs = new HashMap<String, Operator<Record>>();
+		}
+		
+		/**
+		 * Sets the input.
+		 * 
+		 * @param input The input.
+		 */
+		public Builder input(Operator<Record> input) {
+			Validate.notNull(input, "The input must not be null");
+			
+			this.inputs.clear();
+			this.inputs.add(input);
+			return this;
+		}
+		
+		/**
+		 * Sets one or several inputs (union).
+		 * 
+		 * @param inputs
+		 */
+		public Builder input(Operator<Record>...inputs) {
+			this.inputs.clear();
+			for (Operator<Record> c : inputs) {
+				this.inputs.add(c);
+			}
+			return this;
+		}
+		
+		/**
+		 * Sets the inputs.
+		 * 
+		 * @param inputs
+		 */
+		public Builder inputs(List<Operator<Record>> inputs) {
+			this.inputs = inputs;
+			return this;
+		}
+		
+		/**
+		 * Binds the result produced by a plan rooted at {@code root} to a 
+		 * variable used by the UDF wrapped in this operator.
+		 */
+		public Builder setBroadcastVariable(String name, Operator<Record> input) {
+			this.broadcastInputs.put(name, input);
+			return this;
+		}
+		
+		/**
+		 * Binds multiple broadcast variables.
+		 */
+		public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
+			this.broadcastInputs.clear();
+			this.broadcastInputs.putAll(inputs);
+			return this;
+		}
+		
+		/**
+		 * Sets the name of this operator.
+		 * 
+		 * @param name
+		 */
+		public Builder name(String name) {
+			this.name = name;
+			return this;
+		}
+		
+		/**
+		 * Creates and returns a MapOperator from using the values given 
+		 * to the builder.
+		 * 
+		 * @return The created operator
+		 */
+		public MapPartitionOperator build() {
+			if (name == null) {
+				name = udf.getUserCodeClass().getName();
+			}
+			return new MapPartitionOperator(this);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
new file mode 100644
index 0000000..0ec5f15
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
@@ -0,0 +1,92 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.runtime.task;
+
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.pact.runtime.util.MutableToRegularIteratorWrapper;
+import eu.stratosphere.util.Collector;
+import eu.stratosphere.util.MutableObjectIterator;
+
+
+/**
+ * MapPartition task which is executed by a Nephele task manager. The task has a single
+ * input and one or multiple outputs. It is provided with a MapFunction
+ * implementation.
+ * <p>
+ * The MapPartitionTask creates an iterator over all key-value pairs of its input and hands that to the <code>map_partition()</code> method
+ * of the MapFunction.
+ *
+ * @see GenericMapPartition
+ *
+ * @param <IT> The mapper's input data type.
+ * @param <OT> The mapper's output data type.
+ */
+public class MapPartitionDriver<IT, OT> implements PactDriver<GenericMapPartition<IT, OT>, OT> {
+
+	private PactTaskContext<GenericMapPartition<IT, OT>, OT> taskContext;
+
+	private volatile boolean running;
+
+
+	@Override
+	public void setup(PactTaskContext<GenericMapPartition<IT, OT>, OT> context) {
+		this.taskContext = context;
+		this.running = true;
+	}
+
+	@Override
+		public int getNumberOfInputs() {
+		return 1;
+	}
+
+	@Override
+	public Class<GenericMapPartition<IT, OT>> getStubType() {
+		@SuppressWarnings("unchecked")
+		final Class<GenericMapPartition<IT, OT>> clazz = (Class<GenericMapPartition<IT, OT>>) (Class<?>) GenericMapPartition.class;
+		return clazz;
+	}
+
+	@Override
+	public boolean requiresComparatorOnInput() {
+		return false;
+	}
+
+	@Override
+	public void prepare() {
+		// nothing, since a mapper does not need any preparation
+	}
+
+	@Override
+	public void run() throws Exception {
+		// cache references on the stack
+		final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
+		final GenericMapPartition<IT, OT> function = this.taskContext.getStub();
+		final Collector<OT> output = this.taskContext.getOutputCollector();
+
+		final MutableToRegularIteratorWrapper<IT> inIter = new MutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer() );
+		IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
+
+		function.mapPartition(inIter, output);
+	}
+
+	@Override
+	public void cleanup() {
+		// mappers need no cleanup, since no strategies are used.
+	}
+
+	@Override
+	public void cancel() {
+		this.running = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
new file mode 100644
index 0000000..5da0f99
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
@@ -0,0 +1,130 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.operators;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.record.operators.FileDataSink;
+import eu.stratosphere.api.java.record.operators.FileDataSource;
+import eu.stratosphere.api.java.record.functions.MapPartitionFunction;
+import eu.stratosphere.api.java.record.io.DelimitedInputFormat;
+import eu.stratosphere.api.java.record.operators.MapPartitionOperator;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
+import eu.stratosphere.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
+import eu.stratosphere.test.util.RecordAPITestBase;
+import eu.stratosphere.types.IntValue;
+import eu.stratosphere.types.Record;
+import eu.stratosphere.types.StringValue;
+import eu.stratosphere.util.Collector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class MapPartitionITCase extends RecordAPITestBase {
+	
+	private static final Log LOG = LogFactory.getLog(MapITCase.class);
+
+	String inPath = null;
+	String resultPath = null;
+	
+	public MapPartitionITCase(Configuration testConfig) {
+		super(testConfig);
+	}
+
+	private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
+			"1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n" +
+			"5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
+
+	private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
+
+	@Override
+	protected void preSubmit() throws Exception {
+		inPath = createTempFile("in.txt", IN);
+		resultPath = getTempDirPath("result");
+	}
+
+	public static class TestMapPartition extends MapPartitionFunction implements Serializable {
+		private static final long serialVersionUID = 1L;
+
+		private StringValue keyString = new StringValue();
+		private StringValue valueString = new StringValue();
+		
+
+        @Override
+        public void mapPartition(Iterator<Record> records, Collector<Record> out) throws Exception {
+            while(records.hasNext() ){
+                Record record = records.next();
+                keyString = record.getField(0, keyString);
+                valueString = record.getField(1, valueString);
+
+                LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]");
+
+                if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) {
+
+                    record.setField(0, valueString);
+                    record.setField(1, new IntValue(Integer.parseInt(keyString.toString()) + 10));
+
+                    out.collect(record);
+                }
+            }
+        }
+    }
+
+	@Override
+	protected Plan getTestJob() {
+		FileDataSource input = new FileDataSource(
+				new ContractITCaseInputFormat(), inPath);
+		DelimitedInputFormat.configureDelimitedFormat(input)
+			.recordDelimiter('\n');
+		input.setDegreeOfParallelism(config.getInteger("MapPartitionTest#NoSubtasks", 1));
+
+        MapPartitionOperator testMapper = MapPartitionOperator.builder(new TestMapPartition()).build();
+		testMapper.setDegreeOfParallelism(config.getInteger("TestMapPartition#NoSubtasks", 1));
+
+		FileDataSink output = new FileDataSink(
+				new ContractITCaseOutputFormat(), resultPath);
+		output.setDegreeOfParallelism(1);
+
+		output.setInput(testMapper);
+		testMapper.setInput(input);
+
+		return new Plan(output);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(RESULT, resultPath);
+	}
+
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+		LinkedList<Configuration> testConfigs = new LinkedList<Configuration>();
+
+		Configuration config = new Configuration();
+		config.setInteger("MapPartitionTest#NoSubtasks", 4);
+		testConfigs.add(config);
+
+		return toParameterList(testConfigs);
+	}
+}


[2/3] git commit: [FLINK-1053] Cleanup implementation of mapPartition function

Posted by se...@apache.org.
[FLINK-1053] Cleanup implementation of mapPartition function

This closes #42


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d0cead7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d0cead7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d0cead7e

Branch: refs/heads/master
Commit: d0cead7e19ab4c580705799adb5ca460dd228809
Parents: d4de977
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 14 19:56:42 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 14 22:43:26 2014 +0200

----------------------------------------------------------------------
 .../org/apache/flink/compiler/PactCompiler.java |   2 +
 .../flink/compiler/dag/MapPartitionNode.java    |  63 ++++++
 .../operators/MapPartitionDescriptor.java       |  66 +++++++
 .../api/common/functions/FlatMapFunction.java   |   2 +-
 .../common/functions/MapPartitionFunction.java  |  53 +++++
 .../base/MapPartitionOperatorBase.java          |  47 +++++
 .../java/org/apache/flink/api/java/DataSet.java |  25 +--
 .../functions/RichMapPartitionFunction.java     |  41 ++++
 .../java/operators/MapPartitionOperator.java    |  68 +++++++
 .../record/functions/MapPartitionFunction.java  |  46 +++++
 .../record/operators/MapPartitionOperator.java  | 193 +++++++++++++++++++
 .../flink/api/java/typeutils/TypeExtractor.java |  10 +
 .../runtime/operators/MapPartitionDriver.java   |  89 +++++++++
 .../flink/test/util/AbstractTestBase.java       |  12 ++
 .../flink/test/util/JavaProgramTestBase.java    |  35 ++++
 .../test/operators/MapPartitionITCase.java      | 101 ++++++++++
 .../compiler/dag/MapPartitionNode.java          |  56 ------
 .../operators/MapPartitionDescriptor.java       |  60 ------
 .../common/functions/GenericMapPartition.java   |  16 --
 .../base/MapPartitionOperatorBase.java          |  43 -----
 .../java/functions/MapPartitionFunction.java    |  36 ----
 .../java/operators/MapPartitionOperator.java    |  67 -------
 .../record/functions/MapPartitionFunction.java  |  44 -----
 .../record/operators/MapPartitionOperator.java  | 190 ------------------
 .../pact/runtime/task/MapPartitionDriver.java   |  92 ---------
 .../test/operators/MapPartitionITCase.java      | 130 -------------
 26 files changed, 840 insertions(+), 747 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index f16a0a2..de5a3e1 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -45,6 +45,7 @@ import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder;
@@ -64,6 +65,7 @@ import org.apache.flink.compiler.dag.FlatMapNode;
 import org.apache.flink.compiler.dag.GroupReduceNode;
 import org.apache.flink.compiler.dag.IterationNode;
 import org.apache.flink.compiler.dag.MapNode;
+import org.apache.flink.compiler.dag.MapPartitionNode;
 import org.apache.flink.compiler.dag.MatchNode;
 import org.apache.flink.compiler.dag.OptimizerNode;
 import org.apache.flink.compiler.dag.PactConnection;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapPartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapPartitionNode.java
new file mode 100644
index 0000000..fab5664
--- /dev/null
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapPartitionNode.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.compiler.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.compiler.DataStatistics;
+import org.apache.flink.compiler.operators.MapPartitionDescriptor;
+import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
+
+/**
+ * The optimizer's internal representation of a <i>MapPartition</i> operator node.
+ */
+public class MapPartitionNode extends SingleInputNode {
+	
+	/**
+	 * Creates a new MapNode for the given contract.
+	 * 
+	 * @param operator The map partition contract object.
+	 */
+	public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
+		super(operator);
+	}
+
+	@Override
+	public String getName() {
+		return "MapPartition";
+	}
+
+	@Override
+	protected List<OperatorDescriptorSingle> getPossibleProperties() {
+		return Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
+	}
+
+	/**
+	 * Computes the estimates for the MapPartition operator.
+	 * We assume that by default, Map takes one value and transforms it into another value.
+	 * The cardinality consequently stays the same.
+	 */
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		// we really cannot make any estimates here
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
new file mode 100644
index 0000000..5392339
--- /dev/null
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.compiler.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.compiler.dag.SingleInputNode;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class MapPartitionDescriptor extends OperatorDescriptorSingle {
+
+	@Override
+	public DriverStrategy getStrategy() {
+		return DriverStrategy.MAP_PARTITION;
+	}
+
+	@Override
+	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+		return new SingleInputPlanNode(node, "MapPartition ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP_PARTITION);
+	}
+
+	@Override
+	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+		return Collections.singletonList(new RequestedGlobalProperties());
+	}
+
+	@Override
+	protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+		return Collections.singletonList(new RequestedLocalProperties());
+	}
+	
+	@Override
+	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+		return gProps;
+	}
+	
+	@Override
+	public LocalProperties computeLocalProperties(LocalProperties lProps) {
+		return lProps;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
index 0c32eae..7809599 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.util.Collector;
 import java.io.Serializable;
 
 /**
- * interface flatMap functions. FlatMap functions take elements and transform them,
+ * Base interface for flatMap functions. FlatMap functions take elements and transform them,
  * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
  * and arrays. Operations that produce multiple strictly one result element per input element can also
  * use the {@link MapFunction}.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
new file mode 100644
index 0000000..fbd5c9e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Interface for "mapPartition" functions. A "mapPartition" function is called a single time per
+ * data partition receives an Iterable with data elements of that partition. It may return an 
+ * arbitrary number of data elements.
+ * <p>
+ * This function is intended to provide enhanced flexibility in the processing of elements in a partition.
+ * For most of the simple use cases, consider using the {@link MapFunction} or {@link FlatMapFunction}.
+ * <p>
+ * The basic syntax for a MapPartitionFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<Y> result = input.mapPartition(new MyMapPartitionFunction());
+ * </blockquote></pre>
+ * 
+ * @param <T> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+public interface MapPartitionFunction<T, O> extends Function, Serializable {
+	
+	/**
+	 * A user-implemented function that modifies or transforms an incoming object.
+	 *
+	 * @param records All records for the mapper
+	 * @param out The collector to hand results to.
+	 * @throws Exception
+	 */
+	void mapPartition(Iterable<T> records, Collector<O> out) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
new file mode 100644
index 0000000..ae577fd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+
+/**
+ *
+ * @param <IN> The input type.
+ * @param <OUT> The result type.
+ * @param <FT> The type of the user-defined function.
+ */
+public class MapPartitionOperatorBase<IN, OUT, FT extends MapPartitionFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+	
+	public MapPartitionOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+		super(udf, operatorInfo, name);
+	}
+	
+	public MapPartitionOperatorBase(FT udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+		super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, name);
+	}
+	
+	public MapPartitionOperatorBase(Class<? extends FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 2b15007..d8450b5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.io.FileOutputFormat;
@@ -37,7 +38,6 @@ import org.apache.flink.api.java.operators.AggregateOperator;
 import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets;
 import org.apache.flink.api.java.operators.CrossOperator;
-import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.api.java.operators.DistinctOperator;
@@ -47,6 +47,7 @@ import org.apache.flink.api.java.operators.JoinOperator.JoinHint;
 import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.operators.MapPartitionOperator;
 import org.apache.flink.api.java.operators.ProjectOperator.Projection;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.ReduceOperator;
@@ -144,7 +145,7 @@ public abstract class DataSet<T> {
 
 
     /**
-     * Applies a Map operation to the entire partition of the data.
+     * Applies a Map-style operation to the entire partition of the data.
 	 * The function is called once per parallel partition of the data,
 	 * and the entire partition is available through the given Iterator.
 	 * The number of elements that each instance of the MapPartition function
@@ -583,19 +584,19 @@ public abstract class DataSet<T> {
 	 *   both DataSets, i.e., it builds a Cartesian product.
 	 * 
 	 * <p>
-	 * The resulting {@link DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with 
+	 * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with 
 	 * the element of the first input being the first field of the tuple and the element of the 
 	 * second input being the second field of the tuple.
 	 * 
 	 * <p>
-	 * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(CrossFunction)} to define a {@link CrossFunction} which is called for
+	 * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a {@link CrossFunction} which is called for
 	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
 	 * 
 	 * @param other The other DataSet with which this DataSet is crossed. 
 	 * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
 	 * 
-	 * @see DefaultCross
-	 * @see CrossFunction
+	 * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross
+	 * @see org.apache.flink.api.common.functions.CrossFunction
 	 * @see DataSet
 	 * @see Tuple2
 	 */
@@ -612,19 +613,19 @@ public abstract class DataSet<T> {
 	 *   smaller than the first one.
 	 *   
 	 * <p>
-	 * The resulting {@link DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with 
+	 * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with 
 	 * the element of the first input being the first field of the tuple and the element of the 
 	 * second input being the second field of the tuple.
 	 *   
 	 * <p>
-	 * Call {@link DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
+	 * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
 	 * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
 	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
 	 * 
 	 * @param other The other DataSet with which this DataSet is crossed. 
 	 * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
 	 * 
-	 * @see DefaultCross
+	 * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross
 	 * @see org.apache.flink.api.common.functions.CrossFunction
 	 * @see DataSet
 	 * @see Tuple2
@@ -642,19 +643,19 @@ public abstract class DataSet<T> {
 	 *   larger than the first one.
 	 *   
 	 * <p>
-	 * The resulting {@link DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with 
+	 * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with 
 	 * the element of the first input being the first field of the tuple and the element of the 
 	 * second input being the second field of the tuple.
 	 *   
 	 * <p>
-	 * Call {@link DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
+	 * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
 	 * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
 	 * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
 	 * 
 	 * @param other The other DataSet with which this DataSet is crossed. 
 	 * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
 	 * 
-	 * @see DefaultCross
+	 * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross
 	 * @see org.apache.flink.api.common.functions.CrossFunction
 	 * @see DataSet
 	 * @see Tuple2

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapPartitionFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapPartitionFunction.java
new file mode 100644
index 0000000..04176a2
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapPartitionFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link MapPartitionFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ * 
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+public abstract class RichMapPartitionFunction<I, O> extends AbstractRichFunction implements MapPartitionFunction<I, O> {
+
+	private static final long serialVersionUID = 1L;
+	
+	@Override
+	public abstract void mapPartition(Iterable<I> records, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
new file mode 100644
index 0000000..caf55f9
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.DataSet;
+
+/**
+ * This operator represents the application of a "mapPartition" function on a data set, and the
+ * result data set produced by the function.
+ * 
+ * @param <IN> The type of the data set consumed by the operator.
+ * @param <OUT> The type of the data set created by the operator.
+ * 
+ * @see MapPartitionFunction
+ */
+public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapPartitionOperator<IN, OUT>> {
+	
+	protected final MapPartitionFunction<IN, OUT> function;
+	
+	
+	public MapPartitionOperator(DataSet<IN> input, MapPartitionFunction<IN, OUT> function) {
+		super(input, TypeExtractor.getMapPartitionReturnTypes(function, input.getType()));
+		
+		this.function = function;
+		extractSemanticAnnotationsFromUdf(function.getClass());
+	}
+
+	@Override
+	protected MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
+		
+		String name = getName() != null ? getName() : function.getClass().getName();
+		// create operator
+		MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
+		// set input
+		po.setInput(input);
+		// set dop
+		if(this.getParallelism() > 0) {
+			// use specified dop
+			po.setDegreeOfParallelism(this.getParallelism());
+		} else {
+			// if no dop has been specified, use dop of input operator to enable chaining
+			po.setDegreeOfParallelism(input.getDegreeOfParallelism());
+		}
+		
+		return po;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
new file mode 100644
index 0000000..4eb049d
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+
+/**
+ * The MapPartitionFunction must be extended to provide a map partition implementation
+ * By definition, the map partition is called for a full input set.
+ */
+public abstract class MapPartitionFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.MapPartitionFunction<Record, Record> {
+	
+	private static final long serialVersionUID = 1L;
+	
+	/**
+	 * This method must be implemented to provide a user implementation of a MapPartitionFunction.
+	 * It is called for a full input set.
+	 *
+	 * @param records all input records
+	 * @param out A collector that collects all output records.
+	 *
+	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
+	 *                   runtime catches an exception, it aborts the map task and lets the fail-over logic
+	 *                   decide whether to retry the mapper execution.
+	 */
+	@Override
+	public abstract void mapPartition(Iterable<Record> records, Collector<Record> out) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java
new file mode 100644
index 0000000..6d4f991
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.record.operators;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.RecordOperator;
+import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation;
+import org.apache.flink.api.java.record.functions.MapPartitionFunction;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+
+/**
+ * MapPartitionOperator that applies a {@link MapPartitionFunction} to each record independently.
+ * 
+ * @see MapPartitionFunction
+ */
+public class MapPartitionOperator extends MapPartitionOperatorBase<Record, Record, MapPartitionFunction> implements RecordOperator {
+	
+	private static String DEFAULT_NAME = "<Unnamed MapPartition>";
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+	 * 
+	 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+	 */
+	public static Builder builder(MapPartitionFunction udf) {
+		return new Builder(new UserCodeObjectWrapper<MapPartitionFunction>(udf));
+	}
+	
+	/**
+	 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+	 * 
+	 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+	 */
+	public static Builder builder(Class<? extends MapPartitionFunction> udf) {
+		return new Builder(new UserCodeClassWrapper<MapPartitionFunction>(udf));
+	}
+	
+	/**
+	 * The private constructor that only gets invoked from the Builder.
+	 * @param builder
+	 */
+	protected MapPartitionOperator(Builder builder) {
+
+		super(builder.udf, OperatorInfoHelper.unary(), builder.name);
+		
+		if (builder.inputs != null && !builder.inputs.isEmpty()) {
+			setInput(Operator.createUnionCascade(builder.inputs));
+		}
+		
+		setBroadcastVariables(builder.broadcastInputs);
+		setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf));
+	}
+	
+
+	@Override
+	public Class<? extends Key<?>>[] getKeyClasses() {
+		return emptyClassArray();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
+	 */
+	public static class Builder {
+		
+		/* The required parameters */
+		private final UserCodeWrapper<MapPartitionFunction> udf;
+		
+		/* The optional parameters */
+		private List<Operator<Record>> inputs;
+		private Map<String, Operator<Record>> broadcastInputs;
+		private String name = DEFAULT_NAME;
+		
+		/**
+		 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+		 * 
+		 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+		 */
+		private Builder(UserCodeWrapper<MapPartitionFunction> udf) {
+			this.udf = udf;
+			this.inputs = new ArrayList<Operator<Record>>();
+			this.broadcastInputs = new HashMap<String, Operator<Record>>();
+		}
+		
+		/**
+		 * Sets the input.
+		 * 
+		 * @param input The input.
+		 */
+		public Builder input(Operator<Record> input) {
+			Validate.notNull(input, "The input must not be null");
+			
+			this.inputs.clear();
+			this.inputs.add(input);
+			return this;
+		}
+		
+		/**
+		 * Sets one or several inputs (union).
+		 * 
+		 * @param inputs
+		 */
+		public Builder input(Operator<Record>...inputs) {
+			this.inputs.clear();
+			for (Operator<Record> c : inputs) {
+				this.inputs.add(c);
+			}
+			return this;
+		}
+		
+		/**
+		 * Sets the inputs.
+		 * 
+		 * @param inputs
+		 */
+		public Builder inputs(List<Operator<Record>> inputs) {
+			this.inputs = inputs;
+			return this;
+		}
+		
+		/**
+		 * Binds the result produced by a plan rooted at {@code root} to a 
+		 * variable used by the UDF wrapped in this operator.
+		 */
+		public Builder setBroadcastVariable(String name, Operator<Record> input) {
+			this.broadcastInputs.put(name, input);
+			return this;
+		}
+		
+		/**
+		 * Binds multiple broadcast variables.
+		 */
+		public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
+			this.broadcastInputs.clear();
+			this.broadcastInputs.putAll(inputs);
+			return this;
+		}
+		
+		/**
+		 * Sets the name of this operator.
+		 * 
+		 * @param name
+		 */
+		public Builder name(String name) {
+			this.name = name;
+			return this;
+		}
+		
+		/**
+		 * Creates and returns a MapOperator from using the values given 
+		 * to the builder.
+		 * 
+		 * @return The created operator
+		 */
+		public MapPartitionOperator build() {
+			if (name == null) {
+				name = udf.getUserCodeClass().getName();
+			}
+			return new MapPartitionOperator(this);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index b596a87..a8a833f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.java.functions.InvalidTypesException;
@@ -81,6 +82,15 @@ public class TypeExtractor {
 	}
 	
 	@SuppressWarnings("unchecked")
+	public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapInterface, TypeInformation<IN> inType) {
+		validateInputType(MapPartitionFunction.class, mapInterface.getClass(), 0, inType);
+		if(mapInterface instanceof ResultTypeQueryable) {
+			return ((ResultTypeQueryable<OUT>) mapInterface).getProducedType();
+		}
+		return new TypeExtractor().privateCreateTypeInfo(MapPartitionFunction.class, mapInterface.getClass(), 1, inType, null);
+	}
+	
+	@SuppressWarnings("unchecked")
 	public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface,
 			TypeInformation<IN> inType) {
 		validateInputType(GroupReduceFunction.class, groupReduceInterface.getClass(), 0, inType);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
new file mode 100644
index 0000000..260bae5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.runtime.util.MutableToRegularIteratorWrapper;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * MapPartition task which is executed by a Nephele task manager. The task has a single
+ * input and one or multiple outputs. It is provided with a MapFunction
+ * implementation.
+ * <p>
+ * The MapPartitionTask creates an iterator over all key-value pairs of its input and hands that to the <code>map_partition()</code> method
+ * of the MapFunction.
+ *
+ * @see MapPartitionFunction
+ *
+ * @param <IT> The mapper's input data type.
+ * @param <OT> The mapper's output data type.
+ */
+public class MapPartitionDriver<IT, OT> implements PactDriver<MapPartitionFunction<IT, OT>, OT> {
+
+	private PactTaskContext<MapPartitionFunction<IT, OT>, OT> taskContext;
+
+	@Override
+	public void setup(PactTaskContext<MapPartitionFunction<IT, OT>, OT> context) {
+		this.taskContext = context;
+	}
+
+	@Override
+		public int getNumberOfInputs() {
+		return 1;
+	}
+
+	@Override
+	public Class<MapPartitionFunction<IT, OT>> getStubType() {
+		@SuppressWarnings("unchecked")
+		final Class<MapPartitionFunction<IT, OT>> clazz = (Class<MapPartitionFunction<IT, OT>>) (Class<?>) MapPartitionFunction.class;
+		return clazz;
+	}
+
+	@Override
+	public boolean requiresComparatorOnInput() {
+		return false;
+	}
+
+	@Override
+	public void prepare() {
+		// nothing, since a mapper does not need any preparation
+	}
+
+	@Override
+	public void run() throws Exception {
+		// cache references on the stack
+		final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
+		final MapPartitionFunction<IT, OT> function = this.taskContext.getStub();
+		final Collector<OT> output = this.taskContext.getOutputCollector();
+
+		final MutableToRegularIteratorWrapper<IT> inIter = new MutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer());
+		function.mapPartition(inIter, output);
+	}
+
+	@Override
+	public void cleanup() {
+		// mappers need no cleanup, since no strategies are used.
+	}
+
+	@Override
+	public void cancel() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 06df6a1..5ba2306 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -32,6 +32,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
@@ -293,6 +294,17 @@ public abstract class AbstractTestBase {
 		}
 	}
 	
+	public static <X> void compareResultCollections(List<X> expected, List<X> actual, Comparator<X> comparator) {
+		Assert.assertEquals(expected.size(), actual.size());
+		
+		Collections.sort(expected, comparator);
+		Collections.sort(actual, comparator);
+		
+		for (int i = 0; i < expected.size(); i++) {
+			Assert.assertEquals(expected.get(i), actual.get(i));
+		}
+	}
+	
 	private File[] getAllInvolvedFiles(String resultPath) {
 		File result = asFile(resultPath);
 		if (!result.exists()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 9307761..689c424 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -19,6 +19,8 @@
 
 package org.apache.flink.test.util;
 
+import java.util.Comparator;
+
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.client.minicluster.NepheleMiniCluster;
@@ -33,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
 
 
 public abstract class JavaProgramTestBase extends AbstractTestBase {
@@ -179,4 +182,36 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 			initializeContextEnvironment(this);
 		}
 	}
+	
+	public static class TupleComparator<T extends Tuple> implements Comparator<T> {
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public int compare(T o1, T o2) {
+			int a1 = o1.getArity();
+			int a2 = o2.getArity();
+			
+			if (a1 < a2) {
+				return -1;
+			} else if (a2 < a1) {
+				return 1;
+			} else {
+				for (int i = 0; i < a1; i++) {
+					Object obj1 = o1.getField(i);
+					Object obj2 = o2.getField(i);
+					
+					if (!(obj1 instanceof Comparable && obj2 instanceof Comparable)) {
+						Assert.fail("Cannot compare tuple fields");
+					}
+					
+					int cmp = ((Comparable<Object>) obj1).compareTo((Comparable<Object>) obj2);
+					if (cmp != 0) {
+						return cmp;
+					}
+				}
+				
+				return 0;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
new file mode 100644
index 0000000..55b9544
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("serial")
+public class MapPartitionITCase extends JavaProgramTestBase {
+
+	private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n"
+			+ "1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n"
+			+ "5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
+
+	private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
+	
+	
+	private List<Tuple2<String, String>> input = new ArrayList<Tuple2<String,String>>();
+	
+	private List<Tuple2<String, Integer>> expected = new ArrayList<Tuple2<String,Integer>>();
+	
+	private List<Tuple2<String, Integer>> result = new ArrayList<Tuple2<String,Integer>>();
+	
+
+	@Override
+	protected void preSubmit() throws Exception {
+
+		// create input
+		for (String s :IN.split("\n")) {
+			String[] fields = s.split(" ");
+			input.add(new Tuple2<String, String>(fields[0], fields[1]));
+		}
+		
+		// create expected
+		for (String s : RESULT.split("\n")) {
+			String[] fields = s.split(" ");
+			expected.add(new Tuple2<String, Integer>(fields[0], Integer.parseInt(fields[1])));
+		}
+		
+	}
+	
+	@Override
+	protected void postSubmit() {
+		compareResultCollections(expected, result, new TupleComparator<Tuple2<String, Integer>>());
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<Tuple2<String, String>> data = env.fromCollection(input);
+		
+		data.mapPartition(new TestMapPartition()).output(new LocalCollectionOutputFormat<Tuple2<String,Integer>>(result));
+		
+		env.execute();
+	}
+	
+	
+	public static class TestMapPartition implements MapPartitionFunction<Tuple2<String, String>, Tuple2<String, Integer>> {
+
+		@Override
+		public void mapPartition(Iterable<Tuple2<String, String>> records, Collector<Tuple2<String, Integer>> out) {
+			for (Tuple2<String, String> record : records) {
+				String keyString = record.f0;
+				String valueString = record.f1;
+				
+				int keyInt = Integer.parseInt(keyString);
+				int valueInt = Integer.parseInt(valueString);
+
+				if (keyInt + valueInt < 10) {
+					out.collect(new Tuple2<String, Integer>(valueString, keyInt + 10));
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
deleted file mode 100644
index 0d5e4ad..0000000
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.compiler.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import eu.stratosphere.api.common.operators.SingleInputOperator;
-import eu.stratosphere.compiler.DataStatistics;
-import eu.stratosphere.compiler.operators.MapPartitionDescriptor;
-import eu.stratosphere.compiler.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>MapPartition</i> operator node.
- */
-public class MapPartitionNode extends SingleInputNode {
-	
-	/**
-	 * Creates a new MapNode for the given contract.
-	 * 
-	 * @param operator The map partition contract object.
-	 */
-	public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
-		super(operator);
-	}
-
-	@Override
-	public String getName() {
-		return "MapPartition";
-	}
-
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
-	}
-
-	/**
-	 * Computes the estimates for the MapPartition operator.
-	 * We assume that by default, Map takes one value and transforms it into another value.
-	 * The cardinality consequently stays the same.
-	 */
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
deleted file mode 100644
index 41b707d..0000000
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.compiler.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import eu.stratosphere.compiler.dag.SingleInputNode;
-import eu.stratosphere.compiler.dataproperties.GlobalProperties;
-import eu.stratosphere.compiler.dataproperties.LocalProperties;
-import eu.stratosphere.compiler.dataproperties.RequestedGlobalProperties;
-import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
-import eu.stratosphere.compiler.plan.Channel;
-import eu.stratosphere.compiler.plan.SingleInputPlanNode;
-import eu.stratosphere.pact.runtime.task.DriverStrategy;
-
-
-public class MapPartitionDescriptor extends OperatorDescriptorSingle {
-
-	@Override
-	public DriverStrategy getStrategy() {
-		return DriverStrategy.MAP_PARTITION;
-	}
-
-	@Override
-	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		return new SingleInputPlanNode(node, "MapPartition ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP_PARTITION);
-	}
-
-	@Override
-	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
-		return Collections.singletonList(new RequestedGlobalProperties());
-	}
-
-	@Override
-	protected List<RequestedLocalProperties> createPossibleLocalProperties() {
-		return Collections.singletonList(new RequestedLocalProperties());
-	}
-	
-	@Override
-	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
-		return gProps;
-	}
-	
-	@Override
-	public LocalProperties computeLocalProperties(LocalProperties lProps) {
-		return lProps;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
deleted file mode 100644
index accf731..0000000
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package eu.stratosphere.api.common.functions;
-
-import eu.stratosphere.util.Collector;
-
-import java.util.Iterator;
-
-public interface GenericMapPartition<T, O> extends Function {
-	/**
-	 * A user-implemented function that modifies or transforms an incoming object.
-	 *
-	 * @param records All records for the mapper
-	 * @param out The collector to hand results to.
-	 * @throws Exception
-	 */
-	void mapPartition(Iterator<T> records, Collector<O> out) throws Exception;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
deleted file mode 100644
index 9e3552a..0000000
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.api.common.operators.base;
-
-import eu.stratosphere.api.common.functions.GenericMapPartition;
-import eu.stratosphere.api.common.operators.SingleInputOperator;
-import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
-import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
-import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
-import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
-
-
-/**
- *
- * @param <IN> The input type.
- * @param <OUT> The result type.
- * @param <FT> The type of the user-defined function.
- */
-public class MapPartitionOperatorBase<IN, OUT, FT extends GenericMapPartition<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
-	
-	public MapPartitionOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
-		super(udf, operatorInfo, name);
-	}
-	
-	public MapPartitionOperatorBase(FT udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
-		super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, name);
-	}
-	
-	public MapPartitionOperatorBase(Class<? extends FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
-		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
deleted file mode 100644
index 4c0155f..0000000
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.api.java.functions;
-
-
-import eu.stratosphere.api.common.functions.AbstractFunction;
-import eu.stratosphere.api.common.functions.GenericMapPartition;
-import eu.stratosphere.util.Collector;
-
-import java.util.Iterator;
-
-public abstract class MapPartitionFunction<IN, OUT> extends AbstractFunction implements GenericMapPartition<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-	/**
-	 *
-	 * @param records All records for the mapper
-	 * @param out The collector to hand results to.
-	 *
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
-	@Override
-	public abstract void mapPartition(Iterator<IN> records, Collector<OUT> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
deleted file mode 100644
index 836b205..0000000
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-package eu.stratosphere.api.java.operators;
-
-import eu.stratosphere.api.common.functions.GenericMapPartition;
-import eu.stratosphere.api.common.operators.Operator;
-import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
-import eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase;
-import eu.stratosphere.api.java.DataSet;
-import eu.stratosphere.api.java.functions.MapFunction;
-import eu.stratosphere.api.java.functions.MapPartitionFunction;
-import eu.stratosphere.api.java.typeutils.TypeExtractor;
-
-/**
- * This operator represents the application of a "mapPartition" function on a data set, and the
- * result data set produced by the function.
- * 
- * @param <IN> The type of the data set consumed by the operator.
- * @param <OUT> The type of the data set created by the operator.
- * 
- * @see MapFunction
- */
-public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapPartitionOperator<IN, OUT>> {
-	
-	protected final MapPartitionFunction<IN, OUT> function;
-	
-	
-	public MapPartitionOperator(DataSet<IN> input, MapPartitionFunction<IN, OUT> function) {
-		super(input, TypeExtractor.getMapPartitionReturnTypes(function, input.getType()));
-		
-		this.function = function;
-		extractSemanticAnnotationsFromUdf(function.getClass());
-	}
-
-	@Override
-	protected eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>> translateToDataFlow(Operator<IN> input) {
-		
-		String name = getName() != null ? getName() : function.getClass().getName();
-		// create operator
-		MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
-		// set input
-		po.setInput(input);
-		// set dop
-		if(this.getParallelism() > 0) {
-			// use specified dop
-			po.setDegreeOfParallelism(this.getParallelism());
-		} else {
-			// if no dop has been specified, use dop of input operator to enable chaining
-			po.setDegreeOfParallelism(input.getDegreeOfParallelism());
-		}
-		
-		return po;
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
deleted file mode 100644
index 5284d06..0000000
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.api.java.record.functions;
-
-import eu.stratosphere.api.common.functions.AbstractFunction;
-import eu.stratosphere.api.common.functions.GenericMapPartition;
-import eu.stratosphere.types.Record;
-import eu.stratosphere.util.Collector;
-
-import java.util.Iterator;
-
-/**
- * The MapPartitionFunction must be extended to provide a map partition implementation
- * By definition, the map partition is called for a full input set.
- */
-public abstract class MapPartitionFunction extends AbstractFunction implements GenericMapPartition<Record, Record> {
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * This method must be implemented to provide a user implementation of a mappartitioner.
-	 * It is called for a full input set.
-	 *
-	 * @param records all input records
-	 * @param out A collector that collects all output records.
-	 *
-	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
-	 *                   runtime catches an exception, it aborts the map task and lets the fail-over logic
-	 *                   decide whether to retry the mapper execution.
-	 */
-	@Override
-	public abstract void mapPartition(Iterator<Record> records, Collector<Record> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
deleted file mode 100644
index 1ac132c..0000000
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.api.java.record.operators;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase;
-import eu.stratosphere.api.java.record.functions.MapPartitionFunction;
-import org.apache.commons.lang3.Validate;
-
-
-
-import eu.stratosphere.api.common.operators.Operator;
-import eu.stratosphere.api.common.operators.RecordOperator;
-import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
-import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
-import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
-import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
-import eu.stratosphere.types.Key;
-import eu.stratosphere.types.Record;
-
-/**
- * MapPartitionOperator that applies a {@link MapPartitionFunction} to each record independently.
- * 
- * @see MapPartitionFunction
- */
-public class MapPartitionOperator extends MapPartitionOperatorBase<Record, Record, MapPartitionFunction> implements RecordOperator {
-	
-	private static String DEFAULT_NAME = "<Unnamed MapPartition>";
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
-	 * 
-	 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
-	 */
-	public static Builder builder(MapPartitionFunction udf) {
-		return new Builder(new UserCodeObjectWrapper<MapPartitionFunction>(udf));
-	}
-	
-	/**
-	 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
-	 * 
-	 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
-	 */
-	public static Builder builder(Class<? extends MapPartitionFunction> udf) {
-		return new Builder(new UserCodeClassWrapper<MapPartitionFunction>(udf));
-	}
-	
-	/**
-	 * The private constructor that only gets invoked from the Builder.
-	 * @param builder
-	 */
-	protected MapPartitionOperator(Builder builder) {
-
-		super(builder.udf, OperatorInfoHelper.unary(), builder.name);
-		
-		if (builder.inputs != null && !builder.inputs.isEmpty()) {
-			setInput(Operator.createUnionCascade(builder.inputs));
-		}
-		
-		setBroadcastVariables(builder.broadcastInputs);
-		setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf));
-	}
-	
-
-	@Override
-	public Class<? extends Key<?>>[] getKeyClasses() {
-		return emptyClassArray();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
-	 */
-	public static class Builder {
-		
-		/* The required parameters */
-		private final UserCodeWrapper<MapPartitionFunction> udf;
-		
-		/* The optional parameters */
-		private List<Operator<Record>> inputs;
-		private Map<String, Operator<Record>> broadcastInputs;
-		private String name = DEFAULT_NAME;
-		
-		/**
-		 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
-		 * 
-		 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
-		 */
-		private Builder(UserCodeWrapper<MapPartitionFunction> udf) {
-			this.udf = udf;
-			this.inputs = new ArrayList<Operator<Record>>();
-			this.broadcastInputs = new HashMap<String, Operator<Record>>();
-		}
-		
-		/**
-		 * Sets the input.
-		 * 
-		 * @param input The input.
-		 */
-		public Builder input(Operator<Record> input) {
-			Validate.notNull(input, "The input must not be null");
-			
-			this.inputs.clear();
-			this.inputs.add(input);
-			return this;
-		}
-		
-		/**
-		 * Sets one or several inputs (union).
-		 * 
-		 * @param inputs
-		 */
-		public Builder input(Operator<Record>...inputs) {
-			this.inputs.clear();
-			for (Operator<Record> c : inputs) {
-				this.inputs.add(c);
-			}
-			return this;
-		}
-		
-		/**
-		 * Sets the inputs.
-		 * 
-		 * @param inputs
-		 */
-		public Builder inputs(List<Operator<Record>> inputs) {
-			this.inputs = inputs;
-			return this;
-		}
-		
-		/**
-		 * Binds the result produced by a plan rooted at {@code root} to a 
-		 * variable used by the UDF wrapped in this operator.
-		 */
-		public Builder setBroadcastVariable(String name, Operator<Record> input) {
-			this.broadcastInputs.put(name, input);
-			return this;
-		}
-		
-		/**
-		 * Binds multiple broadcast variables.
-		 */
-		public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
-			this.broadcastInputs.clear();
-			this.broadcastInputs.putAll(inputs);
-			return this;
-		}
-		
-		/**
-		 * Sets the name of this operator.
-		 * 
-		 * @param name
-		 */
-		public Builder name(String name) {
-			this.name = name;
-			return this;
-		}
-		
-		/**
-		 * Creates and returns a MapOperator from using the values given 
-		 * to the builder.
-		 * 
-		 * @return The created operator
-		 */
-		public MapPartitionOperator build() {
-			if (name == null) {
-				name = udf.getUserCodeClass().getName();
-			}
-			return new MapPartitionOperator(this);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
deleted file mode 100644
index 0ec5f15..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.pact.runtime.task;
-
-import eu.stratosphere.api.common.functions.GenericMapPartition;
-import eu.stratosphere.pact.runtime.util.MutableToRegularIteratorWrapper;
-import eu.stratosphere.util.Collector;
-import eu.stratosphere.util.MutableObjectIterator;
-
-
-/**
- * MapPartition task which is executed by a Nephele task manager. The task has a single
- * input and one or multiple outputs. It is provided with a MapFunction
- * implementation.
- * <p>
- * The MapPartitionTask creates an iterator over all key-value pairs of its input and hands that to the <code>map_partition()</code> method
- * of the MapFunction.
- *
- * @see GenericMapPartition
- *
- * @param <IT> The mapper's input data type.
- * @param <OT> The mapper's output data type.
- */
-public class MapPartitionDriver<IT, OT> implements PactDriver<GenericMapPartition<IT, OT>, OT> {
-
-	private PactTaskContext<GenericMapPartition<IT, OT>, OT> taskContext;
-
-	private volatile boolean running;
-
-
-	@Override
-	public void setup(PactTaskContext<GenericMapPartition<IT, OT>, OT> context) {
-		this.taskContext = context;
-		this.running = true;
-	}
-
-	@Override
-		public int getNumberOfInputs() {
-		return 1;
-	}
-
-	@Override
-	public Class<GenericMapPartition<IT, OT>> getStubType() {
-		@SuppressWarnings("unchecked")
-		final Class<GenericMapPartition<IT, OT>> clazz = (Class<GenericMapPartition<IT, OT>>) (Class<?>) GenericMapPartition.class;
-		return clazz;
-	}
-
-	@Override
-	public boolean requiresComparatorOnInput() {
-		return false;
-	}
-
-	@Override
-	public void prepare() {
-		// nothing, since a mapper does not need any preparation
-	}
-
-	@Override
-	public void run() throws Exception {
-		// cache references on the stack
-		final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
-		final GenericMapPartition<IT, OT> function = this.taskContext.getStub();
-		final Collector<OT> output = this.taskContext.getOutputCollector();
-
-		final MutableToRegularIteratorWrapper<IT> inIter = new MutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer() );
-		IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
-
-		function.mapPartition(inIter, output);
-	}
-
-	@Override
-	public void cleanup() {
-		// mappers need no cleanup, since no strategies are used.
-	}
-
-	@Override
-	public void cancel() {
-		this.running = false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
deleted file mode 100644
index 5da0f99..0000000
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.test.operators;
-
-import eu.stratosphere.api.common.Plan;
-import eu.stratosphere.api.java.record.operators.FileDataSink;
-import eu.stratosphere.api.java.record.operators.FileDataSource;
-import eu.stratosphere.api.java.record.functions.MapPartitionFunction;
-import eu.stratosphere.api.java.record.io.DelimitedInputFormat;
-import eu.stratosphere.api.java.record.operators.MapPartitionOperator;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import eu.stratosphere.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import eu.stratosphere.test.util.RecordAPITestBase;
-import eu.stratosphere.types.IntValue;
-import eu.stratosphere.types.Record;
-import eu.stratosphere.types.StringValue;
-import eu.stratosphere.util.Collector;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-@RunWith(Parameterized.class)
-public class MapPartitionITCase extends RecordAPITestBase {
-	
-	private static final Log LOG = LogFactory.getLog(MapITCase.class);
-
-	String inPath = null;
-	String resultPath = null;
-	
-	public MapPartitionITCase(Configuration testConfig) {
-		super(testConfig);
-	}
-
-	private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
-			"1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n" +
-			"5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
-
-	private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-
-	@Override
-	protected void preSubmit() throws Exception {
-		inPath = createTempFile("in.txt", IN);
-		resultPath = getTempDirPath("result");
-	}
-
-	public static class TestMapPartition extends MapPartitionFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private StringValue keyString = new StringValue();
-		private StringValue valueString = new StringValue();
-		
-
-        @Override
-        public void mapPartition(Iterator<Record> records, Collector<Record> out) throws Exception {
-            while(records.hasNext() ){
-                Record record = records.next();
-                keyString = record.getField(0, keyString);
-                valueString = record.getField(1, valueString);
-
-                LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]");
-
-                if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) {
-
-                    record.setField(0, valueString);
-                    record.setField(1, new IntValue(Integer.parseInt(keyString.toString()) + 10));
-
-                    out.collect(record);
-                }
-            }
-        }
-    }
-
-	@Override
-	protected Plan getTestJob() {
-		FileDataSource input = new FileDataSource(
-				new ContractITCaseInputFormat(), inPath);
-		DelimitedInputFormat.configureDelimitedFormat(input)
-			.recordDelimiter('\n');
-		input.setDegreeOfParallelism(config.getInteger("MapPartitionTest#NoSubtasks", 1));
-
-        MapPartitionOperator testMapper = MapPartitionOperator.builder(new TestMapPartition()).build();
-		testMapper.setDegreeOfParallelism(config.getInteger("TestMapPartition#NoSubtasks", 1));
-
-		FileDataSink output = new FileDataSink(
-				new ContractITCaseOutputFormat(), resultPath);
-		output.setDegreeOfParallelism(1);
-
-		output.setInput(testMapper);
-		testMapper.setInput(input);
-
-		return new Plan(output);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(RESULT, resultPath);
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-		LinkedList<Configuration> testConfigs = new LinkedList<Configuration>();
-
-		Configuration config = new Configuration();
-		config.setInteger("MapPartitionTest#NoSubtasks", 4);
-		testConfigs.add(config);
-
-		return toParameterList(testConfigs);
-	}
-}