You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/15 13:08:45 UTC
[3/3] git commit: [FLINK-1053] Add "mapPartition" operator.
[FLINK-1053] Add "mapPartition" operator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d4de9774
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d4de9774
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d4de9774
Branch: refs/heads/master
Commit: d4de9774b3237bb1850024b1208640bc50f7adab
Parents: a87559a
Author: kay <fl...@googlemail.com>
Authored: Wed Jun 25 11:57:45 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 14 22:43:26 2014 +0200
----------------------------------------------------------------------
.../org/apache/flink/compiler/PactCompiler.java | 4 +-
.../flink/compiler/costs/CostEstimator.java | 1 +
.../java/org/apache/flink/api/java/DataSet.java | 27 +++
.../flink/runtime/operators/DriverStrategy.java | 4 +
.../compiler/dag/MapPartitionNode.java | 56 ++++++
.../operators/MapPartitionDescriptor.java | 60 ++++++
.../common/functions/GenericMapPartition.java | 16 ++
.../base/MapPartitionOperatorBase.java | 43 +++++
.../java/functions/MapPartitionFunction.java | 36 ++++
.../java/operators/MapPartitionOperator.java | 67 +++++++
.../record/functions/MapPartitionFunction.java | 44 +++++
.../record/operators/MapPartitionOperator.java | 190 +++++++++++++++++++
.../pact/runtime/task/MapPartitionDriver.java | 92 +++++++++
.../test/operators/MapPartitionITCase.java | 130 +++++++++++++
14 files changed, 769 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index e22a365..f16a0a2 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler;
import java.util.ArrayDeque;
@@ -671,6 +670,9 @@ public class PactCompiler {
else if (c instanceof MapOperatorBase) {
n = new MapNode((MapOperatorBase<?, ?, ?>) c);
}
+ else if (c instanceof MapPartitionOperatorBase) {
+ n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
+ }
else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
index b09f82f..cf11424 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
@@ -168,6 +168,7 @@ public abstract class CostEstimator {
case BINARY_NO_OP:
case COLLECTOR_MAP:
case MAP:
+ case MAP_PARTITION:
case FLAT_MAP:
case ALL_GROUP_REDUCE:
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 5da572d..2b15007 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -140,6 +140,33 @@ public abstract class DataSet<T> {
}
return new MapOperator<T, R>(this, mapper);
}
+
+
+
+ /**
+ * Applies a Map operation to the entire partition of the data.
+ * The function is called once per parallel partition of the data,
+ * and the entire partition is available through the given Iterator.
+ * The number of elements that each instance of the MapPartition function
+ * sees is non deterministic and depends on the degree of parallelism of the operation.
+ *
+ * This function is intended for operations that cannot transform individual elements,
+ * requires no grouping of elements. To transform individual elements,
+ * the use of {@code map()} and {@code flatMap()} is preferable.
+ *
+ * @param mapPartition The MapPartitionFunction that is called for the full DataSet.
+ * @return A MapPartitionOperator that represents the transformed DataSet.
+ *
+ * @see MapPartitionFunction
+ * @see MapPartitionOperator
+ * @see DataSet
+ */
+ public <R> MapPartitionOperator<T, R> mapPartition(MapPartitionFunction<T, R> mapPartition ){
+ if (mapPartition == null) {
+ throw new NullPointerException("MapPartition function must not be null.");
+ }
+ return new MapPartitionOperator<T, R>(this, mapPartition);
+ }
/**
* Applies a FlatMap transformation on a {@link DataSet}.<br/>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 5f00277..bb14539 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -44,6 +44,10 @@ public enum DriverStrategy {
COLLECTOR_MAP(CollectorMapDriver.class, ChainedCollectorMapDriver.class, PIPELINED, false),
// the proper mapper
MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, false),
+
+ // the proper map partition
+ MAP_PARTITION(MapPartitionDriver.class, null, PIPELINED, false),
+
// the flat mapper
FLAT_MAP(FlatMapDriver.class, ChainedFlatMapDriver.class, PIPELINED, false),
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
new file mode 100644
index 0000000..0d5e4ad
--- /dev/null
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
@@ -0,0 +1,56 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.compiler.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import eu.stratosphere.api.common.operators.SingleInputOperator;
+import eu.stratosphere.compiler.DataStatistics;
+import eu.stratosphere.compiler.operators.MapPartitionDescriptor;
+import eu.stratosphere.compiler.operators.OperatorDescriptorSingle;
+
+/**
+ * The optimizer's internal representation of a <i>MapPartition</i> operator node.
+ */
+public class MapPartitionNode extends SingleInputNode {
+
+ /**
+ * Creates a new MapNode for the given contract.
+ *
+ * @param operator The map partition contract object.
+ */
+ public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
+ super(operator);
+ }
+
+ @Override
+ public String getName() {
+ return "MapPartition";
+ }
+
+ @Override
+ protected List<OperatorDescriptorSingle> getPossibleProperties() {
+ return Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
+ }
+
+ /**
+ * Computes the estimates for the MapPartition operator.
+ * We assume that by default, Map takes one value and transforms it into another value.
+ * The cardinality consequently stays the same.
+ */
+ @Override
+ protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
new file mode 100644
index 0000000..41b707d
--- /dev/null
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
@@ -0,0 +1,60 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.compiler.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import eu.stratosphere.compiler.dag.SingleInputNode;
+import eu.stratosphere.compiler.dataproperties.GlobalProperties;
+import eu.stratosphere.compiler.dataproperties.LocalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedGlobalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.SingleInputPlanNode;
+import eu.stratosphere.pact.runtime.task.DriverStrategy;
+
+
+public class MapPartitionDescriptor extends OperatorDescriptorSingle {
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.MAP_PARTITION;
+ }
+
+ @Override
+ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ return new SingleInputPlanNode(node, "MapPartition ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP_PARTITION);
+ }
+
+ @Override
+ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+ return Collections.singletonList(new RequestedGlobalProperties());
+ }
+
+ @Override
+ protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+ return Collections.singletonList(new RequestedLocalProperties());
+ }
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+ return gProps;
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties lProps) {
+ return lProps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
new file mode 100644
index 0000000..accf731
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
@@ -0,0 +1,16 @@
+package eu.stratosphere.api.common.functions;
+
+import eu.stratosphere.util.Collector;
+
+import java.util.Iterator;
+
+public interface GenericMapPartition<T, O> extends Function {
+ /**
+ * A user-implemented function that modifies or transforms an incoming object.
+ *
+ * @param records All records for the mapper
+ * @param out The collector to hand results to.
+ * @throws Exception
+ */
+ void mapPartition(Iterator<T> records, Collector<O> out) throws Exception;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
new file mode 100644
index 0000000..9e3552a
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
@@ -0,0 +1,43 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.common.operators.base;
+
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.api.common.operators.SingleInputOperator;
+import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
+import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
+
+
+/**
+ *
+ * @param <IN> The input type.
+ * @param <OUT> The result type.
+ * @param <FT> The type of the user-defined function.
+ */
+public class MapPartitionOperatorBase<IN, OUT, FT extends GenericMapPartition<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+
+ public MapPartitionOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+ super(udf, operatorInfo, name);
+ }
+
+ public MapPartitionOperatorBase(FT udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+ super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, name);
+ }
+
+ public MapPartitionOperatorBase(Class<? extends FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+ super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
new file mode 100644
index 0000000..4c0155f
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
@@ -0,0 +1,36 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.functions;
+
+
+import eu.stratosphere.api.common.functions.AbstractFunction;
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.util.Collector;
+
+import java.util.Iterator;
+
+public abstract class MapPartitionFunction<IN, OUT> extends AbstractFunction implements GenericMapPartition<IN, OUT> {
+
+ private static final long serialVersionUID = 1L;
+ /**
+ *
+ * @param records All records for the mapper
+ * @param out The collector to hand results to.
+ *
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
+ */
+ @Override
+ public abstract void mapPartition(Iterator<IN> records, Collector<OUT> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
new file mode 100644
index 0000000..836b205
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.api.java.operators;
+
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.api.common.operators.Operator;
+import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
+import eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.functions.MapPartitionFunction;
+import eu.stratosphere.api.java.typeutils.TypeExtractor;
+
+/**
+ * This operator represents the application of a "mapPartition" function on a data set, and the
+ * result data set produced by the function.
+ *
+ * @param <IN> The type of the data set consumed by the operator.
+ * @param <OUT> The type of the data set created by the operator.
+ *
+ * @see MapFunction
+ */
+public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapPartitionOperator<IN, OUT>> {
+
+ protected final MapPartitionFunction<IN, OUT> function;
+
+
+ public MapPartitionOperator(DataSet<IN> input, MapPartitionFunction<IN, OUT> function) {
+ super(input, TypeExtractor.getMapPartitionReturnTypes(function, input.getType()));
+
+ this.function = function;
+ extractSemanticAnnotationsFromUdf(function.getClass());
+ }
+
+ @Override
+ protected eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>> translateToDataFlow(Operator<IN> input) {
+
+ String name = getName() != null ? getName() : function.getClass().getName();
+ // create operator
+ MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
+ // set input
+ po.setInput(input);
+ // set dop
+ if(this.getParallelism() > 0) {
+ // use specified dop
+ po.setDegreeOfParallelism(this.getParallelism());
+ } else {
+ // if no dop has been specified, use dop of input operator to enable chaining
+ po.setDegreeOfParallelism(input.getDegreeOfParallelism());
+ }
+
+ return po;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
new file mode 100644
index 0000000..5284d06
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
@@ -0,0 +1,44 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.record.functions;
+
+import eu.stratosphere.api.common.functions.AbstractFunction;
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.types.Record;
+import eu.stratosphere.util.Collector;
+
+import java.util.Iterator;
+
+/**
+ * The MapPartitionFunction must be extended to provide a map partition implementation
+ * By definition, the map partition is called for a full input set.
+ */
+public abstract class MapPartitionFunction extends AbstractFunction implements GenericMapPartition<Record, Record> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * This method must be implemented to provide a user implementation of a mappartitioner.
+ * It is called for a full input set.
+ *
+ * @param records all input records
+ * @param out A collector that collects all output records.
+ *
+ * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
+ * runtime catches an exception, it aborts the map task and lets the fail-over logic
+ * decide whether to retry the mapper execution.
+ */
+ @Override
+ public abstract void mapPartition(Iterator<Record> records, Collector<Record> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
new file mode 100644
index 0000000..1ac132c
--- /dev/null
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
@@ -0,0 +1,190 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.java.record.operators;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase;
+import eu.stratosphere.api.java.record.functions.MapPartitionFunction;
+import org.apache.commons.lang3.Validate;
+
+
+
+import eu.stratosphere.api.common.operators.Operator;
+import eu.stratosphere.api.common.operators.RecordOperator;
+import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
+import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
+import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
+import eu.stratosphere.types.Key;
+import eu.stratosphere.types.Record;
+
+/**
+ * MapPartitionOperator that applies a {@link MapPartitionFunction} to each record independently.
+ *
+ * @see MapPartitionFunction
+ */
+public class MapPartitionOperator extends MapPartitionOperatorBase<Record, Record, MapPartitionFunction> implements RecordOperator {
+
+ private static String DEFAULT_NAME = "<Unnamed MapPartition>";
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+ *
+ * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+ */
+ public static Builder builder(MapPartitionFunction udf) {
+ return new Builder(new UserCodeObjectWrapper<MapPartitionFunction>(udf));
+ }
+
+ /**
+ * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+ *
+ * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+ */
+ public static Builder builder(Class<? extends MapPartitionFunction> udf) {
+ return new Builder(new UserCodeClassWrapper<MapPartitionFunction>(udf));
+ }
+
+ /**
+ * The private constructor that only gets invoked from the Builder.
+ * @param builder
+ */
+ protected MapPartitionOperator(Builder builder) {
+
+ super(builder.udf, OperatorInfoHelper.unary(), builder.name);
+
+ if (builder.inputs != null && !builder.inputs.isEmpty()) {
+ setInput(Operator.createUnionCascade(builder.inputs));
+ }
+
+ setBroadcastVariables(builder.broadcastInputs);
+ setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf));
+ }
+
+
+ @Override
+ public Class<? extends Key<?>>[] getKeyClasses() {
+ return emptyClassArray();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
+ */
+ public static class Builder {
+
+ /* The required parameters */
+ private final UserCodeWrapper<MapPartitionFunction> udf;
+
+ /* The optional parameters */
+ private List<Operator<Record>> inputs;
+ private Map<String, Operator<Record>> broadcastInputs;
+ private String name = DEFAULT_NAME;
+
+ /**
+ * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+ *
+ * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+ */
+ private Builder(UserCodeWrapper<MapPartitionFunction> udf) {
+ this.udf = udf;
+ this.inputs = new ArrayList<Operator<Record>>();
+ this.broadcastInputs = new HashMap<String, Operator<Record>>();
+ }
+
+ /**
+ * Sets the input.
+ *
+ * @param input The input.
+ */
+ public Builder input(Operator<Record> input) {
+ Validate.notNull(input, "The input must not be null");
+
+ this.inputs.clear();
+ this.inputs.add(input);
+ return this;
+ }
+
+ /**
+ * Sets one or several inputs (union).
+ *
+ * @param inputs
+ */
+ public Builder input(Operator<Record>...inputs) {
+ this.inputs.clear();
+ for (Operator<Record> c : inputs) {
+ this.inputs.add(c);
+ }
+ return this;
+ }
+
+ /**
+ * Sets the inputs.
+ *
+ * @param inputs
+ */
+ public Builder inputs(List<Operator<Record>> inputs) {
+ this.inputs = inputs;
+ return this;
+ }
+
+ /**
+ * Binds the result produced by a plan rooted at {@code root} to a
+ * variable used by the UDF wrapped in this operator.
+ */
+ public Builder setBroadcastVariable(String name, Operator<Record> input) {
+ this.broadcastInputs.put(name, input);
+ return this;
+ }
+
+ /**
+ * Binds multiple broadcast variables.
+ */
+ public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
+ this.broadcastInputs.clear();
+ this.broadcastInputs.putAll(inputs);
+ return this;
+ }
+
+ /**
+ * Sets the name of this operator.
+ *
+ * @param name
+ */
+ public Builder name(String name) {
+ this.name = name;
+ return this;
+ }
+
+ /**
+ * Creates and returns a MapOperator from using the values given
+ * to the builder.
+ *
+ * @return The created operator
+ */
+ public MapPartitionOperator build() {
+ if (name == null) {
+ name = udf.getUserCodeClass().getName();
+ }
+ return new MapPartitionOperator(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
new file mode 100644
index 0000000..0ec5f15
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
@@ -0,0 +1,92 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.runtime.task;
+
+import eu.stratosphere.api.common.functions.GenericMapPartition;
+import eu.stratosphere.pact.runtime.util.MutableToRegularIteratorWrapper;
+import eu.stratosphere.util.Collector;
+import eu.stratosphere.util.MutableObjectIterator;
+
+
+/**
+ * MapPartition task which is executed by a Nephele task manager. The task has a single
+ * input and one or multiple outputs. It is provided with a MapFunction
+ * implementation.
+ * <p>
+ * The MapPartitionTask creates an iterator over all key-value pairs of its input and hands that to the <code>map_partition()</code> method
+ * of the MapFunction.
+ *
+ * @see GenericMapPartition
+ *
+ * @param <IT> The mapper's input data type.
+ * @param <OT> The mapper's output data type.
+ */
+public class MapPartitionDriver<IT, OT> implements PactDriver<GenericMapPartition<IT, OT>, OT> {
+
+ private PactTaskContext<GenericMapPartition<IT, OT>, OT> taskContext;
+
+ private volatile boolean running;
+
+
+ @Override
+ public void setup(PactTaskContext<GenericMapPartition<IT, OT>, OT> context) {
+ this.taskContext = context;
+ this.running = true;
+ }
+
+ @Override
+ public int getNumberOfInputs() {
+ return 1;
+ }
+
+ @Override
+ public Class<GenericMapPartition<IT, OT>> getStubType() {
+ @SuppressWarnings("unchecked")
+ final Class<GenericMapPartition<IT, OT>> clazz = (Class<GenericMapPartition<IT, OT>>) (Class<?>) GenericMapPartition.class;
+ return clazz;
+ }
+
+ @Override
+ public boolean requiresComparatorOnInput() {
+ return false;
+ }
+
+ @Override
+ public void prepare() {
+ // nothing, since a mapper does not need any preparation
+ }
+
+ @Override
+ public void run() throws Exception {
+ // cache references on the stack
+ final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
+ final GenericMapPartition<IT, OT> function = this.taskContext.getStub();
+ final Collector<OT> output = this.taskContext.getOutputCollector();
+
+ final MutableToRegularIteratorWrapper<IT> inIter = new MutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer() );
+ IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
+
+ function.mapPartition(inIter, output);
+ }
+
+ @Override
+ public void cleanup() {
+ // mappers need no cleanup, since no strategies are used.
+ }
+
+ @Override
+ public void cancel() {
+ this.running = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d4de9774/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
new file mode 100644
index 0000000..5da0f99
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
@@ -0,0 +1,130 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.operators;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.record.operators.FileDataSink;
+import eu.stratosphere.api.java.record.operators.FileDataSource;
+import eu.stratosphere.api.java.record.functions.MapPartitionFunction;
+import eu.stratosphere.api.java.record.io.DelimitedInputFormat;
+import eu.stratosphere.api.java.record.operators.MapPartitionOperator;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
+import eu.stratosphere.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
+import eu.stratosphere.test.util.RecordAPITestBase;
+import eu.stratosphere.types.IntValue;
+import eu.stratosphere.types.Record;
+import eu.stratosphere.types.StringValue;
+import eu.stratosphere.util.Collector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class MapPartitionITCase extends RecordAPITestBase {
+
+ private static final Log LOG = LogFactory.getLog(MapITCase.class);
+
+ String inPath = null;
+ String resultPath = null;
+
+ public MapPartitionITCase(Configuration testConfig) {
+ super(testConfig);
+ }
+
+ private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
+ "1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n" +
+ "5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
+
+ private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
+
+ @Override
+ protected void preSubmit() throws Exception {
+ inPath = createTempFile("in.txt", IN);
+ resultPath = getTempDirPath("result");
+ }
+
+ public static class TestMapPartition extends MapPartitionFunction implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private StringValue keyString = new StringValue();
+ private StringValue valueString = new StringValue();
+
+
+ @Override
+ public void mapPartition(Iterator<Record> records, Collector<Record> out) throws Exception {
+ while(records.hasNext() ){
+ Record record = records.next();
+ keyString = record.getField(0, keyString);
+ valueString = record.getField(1, valueString);
+
+ LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]");
+
+ if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) {
+
+ record.setField(0, valueString);
+ record.setField(1, new IntValue(Integer.parseInt(keyString.toString()) + 10));
+
+ out.collect(record);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected Plan getTestJob() {
+ FileDataSource input = new FileDataSource(
+ new ContractITCaseInputFormat(), inPath);
+ DelimitedInputFormat.configureDelimitedFormat(input)
+ .recordDelimiter('\n');
+ input.setDegreeOfParallelism(config.getInteger("MapPartitionTest#NoSubtasks", 1));
+
+ MapPartitionOperator testMapper = MapPartitionOperator.builder(new TestMapPartition()).build();
+ testMapper.setDegreeOfParallelism(config.getInteger("TestMapPartition#NoSubtasks", 1));
+
+ FileDataSink output = new FileDataSink(
+ new ContractITCaseOutputFormat(), resultPath);
+ output.setDegreeOfParallelism(1);
+
+ output.setInput(testMapper);
+ testMapper.setInput(input);
+
+ return new Plan(output);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(RESULT, resultPath);
+ }
+
+ @Parameters
+ public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+ LinkedList<Configuration> testConfigs = new LinkedList<Configuration>();
+
+ Configuration config = new Configuration();
+ config.setInteger("MapPartitionTest#NoSubtasks", 4);
+ testConfigs.add(config);
+
+ return toParameterList(testConfigs);
+ }
+}