You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/15 13:08:45 UTC

[3/3] git commit: [FLINK-1053] Add "mapPartition" operator.

[FLINK-1053] Add "mapPartition" operator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d4de9774
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d4de9774
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d4de9774

Branch: refs/heads/master
Commit: d4de9774b3237bb1850024b1208640bc50f7adab
Parents: a87559a
Author: kay <fl...@googlemail.com>
Authored: Wed Jun 25 11:57:45 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 14 22:43:26 2014 +0200

----------------------------------------------------------------------
 .../org/apache/flink/compiler/PactCompiler.java |   4 +-
 .../flink/compiler/costs/CostEstimator.java     |   1 +
 .../java/org/apache/flink/api/java/DataSet.java |  27 +++
 .../flink/runtime/operators/DriverStrategy.java |   4 +
 .../compiler/dag/MapPartitionNode.java          |  56 ++++++
 .../operators/MapPartitionDescriptor.java       |  60 ++++++
 .../common/functions/GenericMapPartition.java   |  16 ++
 .../base/MapPartitionOperatorBase.java          |  43 +++++
 .../java/functions/MapPartitionFunction.java    |  36 ++++
 .../java/operators/MapPartitionOperator.java    |  67 +++++++
 .../record/functions/MapPartitionFunction.java  |  44 +++++
 .../record/operators/MapPartitionOperator.java  | 190 +++++++++++++++++++
 .../pact/runtime/task/MapPartitionDriver.java   |  92 +++++++++
 .../test/operators/MapPartitionITCase.java      | 130 +++++++++++++
 14 files changed, 769 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index e22a365..f16a0a2 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler;
 
 import java.util.ArrayDeque;
@@ -671,6 +670,9 @@ public class PactCompiler {
 			else if (c instanceof MapOperatorBase) {
 				n = new MapNode((MapOperatorBase<?, ?, ?>) c);
 			}
+			else if (c instanceof MapPartitionOperatorBase) {
+				n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
+			}
 			else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
 				n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
index b09f82f..cf11424 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
@@ -168,6 +168,7 @@ public abstract class CostEstimator {
 		case BINARY_NO_OP:	
 		case COLLECTOR_MAP:
 		case MAP:
+		case MAP_PARTITION:
 		case FLAT_MAP:
 			
 		case ALL_GROUP_REDUCE:

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 5da572d..2b15007 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -140,6 +140,33 @@ public abstract class DataSet<T> {
 		}
 		return new MapOperator<T, R>(this, mapper);
 	}
+
+
+
+    /**
+     * Applies a Map operation to the entire partition of the data.
+	 * The function is called once per parallel partition of the data,
+	 * and the entire partition is available through the given Iterator.
+	 * The number of elements that each instance of the MapPartition function
+	 * sees is non deterministic and depends on the degree of parallelism of the operation.
+	 *
+	 * This function is intended for operations that cannot transform individual elements,
+	 * requires no grouping of elements. To transform individual elements,
+	 * the use of {@code map()} and {@code flatMap()} is preferable.
+	 *
+	 * @param mapPartition The MapPartitionFunction that is called for the full DataSet.
+     * @return A MapPartitionOperator that represents the transformed DataSet.
+     *
+     * @see MapPartitionFunction
+     * @see MapPartitionOperator
+     * @see DataSet
+     */
+	public <R> MapPartitionOperator<T, R> mapPartition(MapPartitionFunction<T, R> mapPartition ){
+		if (mapPartition == null) {
+			throw new NullPointerException("MapPartition function must not be null.");
+		}
+		return new MapPartitionOperator<T, R>(this, mapPartition);
+	}
 	
 	/**
 	 * Applies a FlatMap transformation on a {@link DataSet}.<br/>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 5f00277..bb14539 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -44,6 +44,10 @@ public enum DriverStrategy {
 	COLLECTOR_MAP(CollectorMapDriver.class, ChainedCollectorMapDriver.class, PIPELINED, false),
 	// the proper mapper
 	MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, false),
+
+	// the proper map partition
+	MAP_PARTITION(MapPartitionDriver.class, null, PIPELINED, false),
+
 	// the flat mapper
 	FLAT_MAP(FlatMapDriver.class, ChainedFlatMapDriver.class, PIPELINED, false),
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
new file mode 100644
index 0000000..0d5e4ad
--- /dev/null
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
@@ -0,0 +1,56 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.compiler.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import eu.stratosphere.api.common.operators.SingleInputOperator;
+import eu.stratosphere.compiler.DataStatistics;
+import eu.stratosphere.compiler.operators.MapPartitionDescriptor;
+import eu.stratosphere.compiler.operators.OperatorDescriptorSingle;
+
+/**
+ * The optimizer's internal representation of a <i>MapPartition</i> operator node.
+ */
+public class MapPartitionNode extends SingleInputNode {
+	
+	/**
+	 * Creates a new MapNode for the given contract.
+	 * 
+	 * @param operator The map partition contract object.
+	 */
+	public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
+		super(operator);
+	}
+
+	@Override
+	public String getName() {
+		return "MapPartition";
+	}
+
+	@Override
+	protected List<OperatorDescriptorSingle> getPossibleProperties() {
+		return Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
+	}
+
+	/**
+	 * Computes the estimates for the MapPartition operator.
+	 * We assume that by default, Map takes one value and transforms it into another value.
+	 * The cardinality consequently stays the same.
+	 */
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
new file mode 100644
index 0000000..41b707d
--- /dev/null
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
@@ -0,0 +1,60 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.compiler.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import eu.stratosphere.compiler.dag.SingleInputNode;
+import eu.stratosphere.compiler.dataproperties.GlobalProperties;
+import eu.stratosphere.compiler.dataproperties.LocalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedGlobalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.SingleInputPlanNode;
+import eu.stratosphere.pact.runtime.task.DriverStrategy;
+
+
+public class MapPartitionDescriptor extends OperatorDescriptorSingle {
+
+	@Override
+	public DriverStrategy getStrategy() {
+		return DriverStrategy.MAP_PARTITION;
+	}
+
+	@Override
+	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+		return new SingleInputPlanNode(node, "MapPartition ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP_PARTITION);
+	}
+
+	@Override
+	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+		return Collections.singletonList(new RequestedGlobalProperties());
+	}
+
+	@Override
+	protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+		return Collections.singletonList(new RequestedLocalProperties());
+	}
+	
+	@Override
+	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+		return gProps;
+	}
+	
+	@Override
+	public LocalProperties computeLocalProperties(LocalProperties lProps) {
+		return lProps;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
new file mode 100644
index 0000000..accf731
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
@@ -0,0 +1,16 @@
+package eu.stratosphere.api.common.functions;
+
+import eu.stratosphere.util.Collector;
+
+import java.util.Iterator;
+
+public interface GenericMapPartition<T, O> extends Function {
+	/**
+	 * A user-implemented function that modifies or transforms an incoming object.
+	 *
+	 * @param records All records for the mapper
+	 * @param out The collector to hand results to.
+	 * @throws Exception
+	 */
+	void mapPartition(Iterator<T> records, Collector<O> out) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
new file mode 100644
index 0000000..9e3552a
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
@@ -0,0 +1,43 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.common.operators.base;
+
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.api.common.operators.SingleInputOperator;
+import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
+import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
+
+
+/**
+ *
+ * @param <IN> The input type.
+ * @param <OUT> The result type.
+ * @param <FT> The type of the user-defined function.
+ */
+public class MapPartitionOperatorBase<IN, OUT, FT extends GenericMapPartition<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+	
+	public MapPartitionOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+		super(udf, operatorInfo, name);
+	}
+	
+	public MapPartitionOperatorBase(FT udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+		super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, name);
+	}
+	
+	public MapPartitionOperatorBase(Class<? extends FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
new file mode 100644
index 0000000..4c0155f
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
@@ -0,0 +1,36 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.functions;
+
+
+import eu.stratosphere.api.common.functions.AbstractFunction;
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.util.Collector;
+
+import java.util.Iterator;
+
+public abstract class MapPartitionFunction<IN, OUT> extends AbstractFunction implements GenericMapPartition<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+	/**
+	 *
+	 * @param records All records for the mapper
+	 * @param out The collector to hand results to.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	@Override
+	public abstract void mapPartition(Iterator<IN> records, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
new file mode 100644
index 0000000..836b205
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.api.java.operators;
+
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.api.common.operators.Operator;
+import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
+import eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.functions.MapPartitionFunction;
+import eu.stratosphere.api.java.typeutils.TypeExtractor;
+
+/**
+ * This operator represents the application of a "mapPartition" function on a data set, and the
+ * result data set produced by the function.
+ * 
+ * @param <IN> The type of the data set consumed by the operator.
+ * @param <OUT> The type of the data set created by the operator.
+ * 
+ * @see MapFunction
+ */
+public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapPartitionOperator<IN, OUT>> {
+	
+	protected final MapPartitionFunction<IN, OUT> function;
+	
+	
+	public MapPartitionOperator(DataSet<IN> input, MapPartitionFunction<IN, OUT> function) {
+		super(input, TypeExtractor.getMapPartitionReturnTypes(function, input.getType()));
+		
+		this.function = function;
+		extractSemanticAnnotationsFromUdf(function.getClass());
+	}
+
+	@Override
+	protected eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>> translateToDataFlow(Operator<IN> input) {
+		
+		String name = getName() != null ? getName() : function.getClass().getName();
+		// create operator
+		MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
+		// set input
+		po.setInput(input);
+		// set dop
+		if(this.getParallelism() > 0) {
+			// use specified dop
+			po.setDegreeOfParallelism(this.getParallelism());
+		} else {
+			// if no dop has been specified, use dop of input operator to enable chaining
+			po.setDegreeOfParallelism(input.getDegreeOfParallelism());
+		}
+		
+		return po;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
new file mode 100644
index 0000000..5284d06
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
@@ -0,0 +1,44 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.record.functions;
+
+import eu.stratosphere.api.common.functions.AbstractFunction;
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.types.Record;
+import eu.stratosphere.util.Collector;
+
+import java.util.Iterator;
+
+/**
+ * The MapPartitionFunction must be extended to provide a map partition implementation
+ * By definition, the map partition is called for a full input set.
+ */
+public abstract class MapPartitionFunction extends AbstractFunction implements GenericMapPartition<Record, Record> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * This method must be implemented to provide a user implementation of a mappartitioner.
+	 * It is called for a full input set.
+	 *
+	 * @param records all input records
+	 * @param out A collector that collects all output records.
+	 *
+	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
+	 *                   runtime catches an exception, it aborts the map task and lets the fail-over logic
+	 *                   decide whether to retry the mapper execution.
+	 */
+	@Override
+	public abstract void mapPartition(Iterator<Record> records, Collector<Record> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
new file mode 100644
index 0000000..1ac132c
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
@@ -0,0 +1,190 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.record.operators;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase;
+import eu.stratosphere.api.java.record.functions.MapPartitionFunction;
+import org.apache.commons.lang3.Validate;
+
+
+
+import eu.stratosphere.api.common.operators.Operator;
+import eu.stratosphere.api.common.operators.RecordOperator;
+import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
+import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
+import eu.stratosphere.types.Key;
+import eu.stratosphere.types.Record;
+
+/**
+ * MapPartitionOperator that applies a {@link MapPartitionFunction} to each record independently.
+ * 
+ * @see MapPartitionFunction
+ */
+public class MapPartitionOperator extends MapPartitionOperatorBase<Record, Record, MapPartitionFunction> implements RecordOperator {
+	
+	private static String DEFAULT_NAME = "<Unnamed MapPartition>";
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+	 * 
+	 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+	 */
+	public static Builder builder(MapPartitionFunction udf) {
+		return new Builder(new UserCodeObjectWrapper<MapPartitionFunction>(udf));
+	}
+	
+	/**
+	 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+	 * 
+	 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+	 */
+	public static Builder builder(Class<? extends MapPartitionFunction> udf) {
+		return new Builder(new UserCodeClassWrapper<MapPartitionFunction>(udf));
+	}
+	
+	/**
+	 * The private constructor that only gets invoked from the Builder.
+	 * @param builder
+	 */
+	protected MapPartitionOperator(Builder builder) {
+
+		super(builder.udf, OperatorInfoHelper.unary(), builder.name);
+		
+		if (builder.inputs != null && !builder.inputs.isEmpty()) {
+			setInput(Operator.createUnionCascade(builder.inputs));
+		}
+		
+		setBroadcastVariables(builder.broadcastInputs);
+		setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf));
+	}
+	
+
+	@Override
+	public Class<? extends Key<?>>[] getKeyClasses() {
+		return emptyClassArray();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
+	 */
+	public static class Builder {
+		
+		/* The required parameters */
+		private final UserCodeWrapper<MapPartitionFunction> udf;
+		
+		/* The optional parameters */
+		private List<Operator<Record>> inputs;
+		private Map<String, Operator<Record>> broadcastInputs;
+		private String name = DEFAULT_NAME;
+		
+		/**
+		 * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+		 * 
+		 * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+		 */
+		private Builder(UserCodeWrapper<MapPartitionFunction> udf) {
+			this.udf = udf;
+			this.inputs = new ArrayList<Operator<Record>>();
+			this.broadcastInputs = new HashMap<String, Operator<Record>>();
+		}
+		
+		/**
+		 * Sets the input.
+		 * 
+		 * @param input The input.
+		 */
+		public Builder input(Operator<Record> input) {
+			Validate.notNull(input, "The input must not be null");
+			
+			this.inputs.clear();
+			this.inputs.add(input);
+			return this;
+		}
+		
+		/**
+		 * Sets one or several inputs (union).
+		 * 
+		 * @param inputs
+		 */
+		public Builder input(Operator<Record>...inputs) {
+			this.inputs.clear();
+			for (Operator<Record> c : inputs) {
+				this.inputs.add(c);
+			}
+			return this;
+		}
+		
+		/**
+		 * Sets the inputs.
+		 * 
+		 * @param inputs
+		 */
+		public Builder inputs(List<Operator<Record>> inputs) {
+			this.inputs = inputs;
+			return this;
+		}
+		
+		/**
+		 * Binds the result produced by a plan rooted at {@code root} to a 
+		 * variable used by the UDF wrapped in this operator.
+		 */
+		public Builder setBroadcastVariable(String name, Operator<Record> input) {
+			this.broadcastInputs.put(name, input);
+			return this;
+		}
+		
+		/**
+		 * Binds multiple broadcast variables.
+		 */
+		public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
+			this.broadcastInputs.clear();
+			this.broadcastInputs.putAll(inputs);
+			return this;
+		}
+		
+		/**
+		 * Sets the name of this operator.
+		 * 
+		 * @param name
+		 */
+		public Builder name(String name) {
+			this.name = name;
+			return this;
+		}
+		
+		/**
+		 * Creates and returns a MapOperator from using the values given 
+		 * to the builder.
+		 * 
+		 * @return The created operator
+		 */
+		public MapPartitionOperator build() {
+			if (name == null) {
+				name = udf.getUserCodeClass().getName();
+			}
+			return new MapPartitionOperator(this);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
new file mode 100644
index 0000000..0ec5f15
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
@@ -0,0 +1,92 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.runtime.task;
+
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.pact.runtime.util.MutableToRegularIteratorWrapper;
+import eu.stratosphere.util.Collector;
+import eu.stratosphere.util.MutableObjectIterator;
+
+
+/**
+ * MapPartition task which is executed by a Nephele task manager. The task has a single
+ * input and one or multiple outputs. It is provided with a MapFunction
+ * implementation.
+ * <p>
+ * The MapPartitionTask creates an iterator over all key-value pairs of its input and hands that to the <code>map_partition()</code> method
+ * of the MapFunction.
+ *
+ * @see GenericMapPartition
+ *
+ * @param <IT> The mapper's input data type.
+ * @param <OT> The mapper's output data type.
+ */
+public class MapPartitionDriver<IT, OT> implements PactDriver<GenericMapPartition<IT, OT>, OT> {
+
+	private PactTaskContext<GenericMapPartition<IT, OT>, OT> taskContext;
+
+	private volatile boolean running;
+
+
+	@Override
+	public void setup(PactTaskContext<GenericMapPartition<IT, OT>, OT> context) {
+		this.taskContext = context;
+		this.running = true;
+	}
+
+	@Override
+		public int getNumberOfInputs() {
+		return 1;
+	}
+
+	@Override
+	public Class<GenericMapPartition<IT, OT>> getStubType() {
+		@SuppressWarnings("unchecked")
+		final Class<GenericMapPartition<IT, OT>> clazz = (Class<GenericMapPartition<IT, OT>>) (Class<?>) GenericMapPartition.class;
+		return clazz;
+	}
+
+	@Override
+	public boolean requiresComparatorOnInput() {
+		return false;
+	}
+
+	@Override
+	public void prepare() {
+		// nothing, since a mapper does not need any preparation
+	}
+
+	@Override
+	public void run() throws Exception {
+		// cache references on the stack
+		final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
+		final GenericMapPartition<IT, OT> function = this.taskContext.getStub();
+		final Collector<OT> output = this.taskContext.getOutputCollector();
+
+		final MutableToRegularIteratorWrapper<IT> inIter = new MutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer() );
+		IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
+
+		function.mapPartition(inIter, output);
+	}
+
+	@Override
+	public void cleanup() {
+		// mappers need no cleanup, since no strategies are used.
+	}
+
+	@Override
+	public void cancel() {
+		this.running = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
new file mode 100644
index 0000000..5da0f99
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
@@ -0,0 +1,130 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.operators;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.record.operators.FileDataSink;
+import eu.stratosphere.api.java.record.operators.FileDataSource;
+import eu.stratosphere.api.java.record.functions.MapPartitionFunction;
+import eu.stratosphere.api.java.record.io.DelimitedInputFormat;
+import eu.stratosphere.api.java.record.operators.MapPartitionOperator;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
+import eu.stratosphere.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
+import eu.stratosphere.test.util.RecordAPITestBase;
+import eu.stratosphere.types.IntValue;
+import eu.stratosphere.types.Record;
+import eu.stratosphere.types.StringValue;
+import eu.stratosphere.util.Collector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class MapPartitionITCase extends RecordAPITestBase {
+	
+	private static final Log LOG = LogFactory.getLog(MapITCase.class);
+
+	String inPath = null;
+	String resultPath = null;
+	
+	public MapPartitionITCase(Configuration testConfig) {
+		super(testConfig);
+	}
+
+	private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
+			"1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n" +
+			"5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
+
+	private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
+
+	@Override
+	protected void preSubmit() throws Exception {
+		inPath = createTempFile("in.txt", IN);
+		resultPath = getTempDirPath("result");
+	}
+
+	public static class TestMapPartition extends MapPartitionFunction implements Serializable {
+		private static final long serialVersionUID = 1L;
+
+		private StringValue keyString = new StringValue();
+		private StringValue valueString = new StringValue();
+		
+
+        @Override
+        public void mapPartition(Iterator<Record> records, Collector<Record> out) throws Exception {
+            while(records.hasNext() ){
+                Record record = records.next();
+                keyString = record.getField(0, keyString);
+                valueString = record.getField(1, valueString);
+
+                LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]");
+
+                if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) {
+
+                    record.setField(0, valueString);
+                    record.setField(1, new IntValue(Integer.parseInt(keyString.toString()) + 10));
+
+                    out.collect(record);
+                }
+            }
+        }
+    }
+
+	@Override
+	protected Plan getTestJob() {
+		FileDataSource input = new FileDataSource(
+				new ContractITCaseInputFormat(), inPath);
+		DelimitedInputFormat.configureDelimitedFormat(input)
+			.recordDelimiter('\n');
+		input.setDegreeOfParallelism(config.getInteger("MapPartitionTest#NoSubtasks", 1));
+
+        MapPartitionOperator testMapper = MapPartitionOperator.builder(new TestMapPartition()).build();
+		testMapper.setDegreeOfParallelism(config.getInteger("TestMapPartition#NoSubtasks", 1));
+
+		FileDataSink output = new FileDataSink(
+				new ContractITCaseOutputFormat(), resultPath);
+		output.setDegreeOfParallelism(1);
+
+		output.setInput(testMapper);
+		testMapper.setInput(input);
+
+		return new Plan(output);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(RESULT, resultPath);
+	}
+
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+		LinkedList<Configuration> testConfigs = new LinkedList<Configuration>();
+
+		Configuration config = new Configuration();
+		config.setInteger("MapPartitionTest#NoSubtasks", 4);
+		testConfigs.add(config);
+
+		return toParameterList(testConfigs);
+	}
+}