You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/15 13:08:44 UTC
[2/3] git commit: [FLINK-1053] Cleanup implementation of mapPartition
function
[FLINK-1053] Cleanup implementation of mapPartition function
This closes #42
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d0cead7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d0cead7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d0cead7e
Branch: refs/heads/master
Commit: d0cead7e19ab4c580705799adb5ca460dd228809
Parents: d4de977
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 14 19:56:42 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 14 22:43:26 2014 +0200
----------------------------------------------------------------------
.../org/apache/flink/compiler/PactCompiler.java | 2 +
.../flink/compiler/dag/MapPartitionNode.java | 63 ++++++
.../operators/MapPartitionDescriptor.java | 66 +++++++
.../api/common/functions/FlatMapFunction.java | 2 +-
.../common/functions/MapPartitionFunction.java | 53 +++++
.../base/MapPartitionOperatorBase.java | 47 +++++
.../java/org/apache/flink/api/java/DataSet.java | 25 +--
.../functions/RichMapPartitionFunction.java | 41 ++++
.../java/operators/MapPartitionOperator.java | 68 +++++++
.../record/functions/MapPartitionFunction.java | 46 +++++
.../record/operators/MapPartitionOperator.java | 193 +++++++++++++++++++
.../flink/api/java/typeutils/TypeExtractor.java | 10 +
.../runtime/operators/MapPartitionDriver.java | 89 +++++++++
.../flink/test/util/AbstractTestBase.java | 12 ++
.../flink/test/util/JavaProgramTestBase.java | 35 ++++
.../test/operators/MapPartitionITCase.java | 101 ++++++++++
.../compiler/dag/MapPartitionNode.java | 56 ------
.../operators/MapPartitionDescriptor.java | 60 ------
.../common/functions/GenericMapPartition.java | 16 --
.../base/MapPartitionOperatorBase.java | 43 -----
.../java/functions/MapPartitionFunction.java | 36 ----
.../java/operators/MapPartitionOperator.java | 67 -------
.../record/functions/MapPartitionFunction.java | 44 -----
.../record/operators/MapPartitionOperator.java | 190 ------------------
.../pact/runtime/task/MapPartitionDriver.java | 92 ---------
.../test/operators/MapPartitionITCase.java | 130 -------------
26 files changed, 840 insertions(+), 747 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index f16a0a2..de5a3e1 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -45,6 +45,7 @@ import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder;
import org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder;
@@ -64,6 +65,7 @@ import org.apache.flink.compiler.dag.FlatMapNode;
import org.apache.flink.compiler.dag.GroupReduceNode;
import org.apache.flink.compiler.dag.IterationNode;
import org.apache.flink.compiler.dag.MapNode;
+import org.apache.flink.compiler.dag.MapPartitionNode;
import org.apache.flink.compiler.dag.MatchNode;
import org.apache.flink.compiler.dag.OptimizerNode;
import org.apache.flink.compiler.dag.PactConnection;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapPartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapPartitionNode.java
new file mode 100644
index 0000000..fab5664
--- /dev/null
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapPartitionNode.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.compiler.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.compiler.DataStatistics;
+import org.apache.flink.compiler.operators.MapPartitionDescriptor;
+import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
+
+/**
+ * The optimizer's internal representation of a <i>MapPartition</i> operator node.
+ */
+public class MapPartitionNode extends SingleInputNode {
+
+ /**
+ * Creates a new MapNode for the given contract.
+ *
+ * @param operator The map partition contract object.
+ */
+ public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
+ super(operator);
+ }
+
+ @Override
+ public String getName() {
+ return "MapPartition";
+ }
+
+ @Override
+ protected List<OperatorDescriptorSingle> getPossibleProperties() {
+ return Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
+ }
+
+ /**
+ * Computes the estimates for the MapPartition operator.
+ * We assume that by default, Map takes one value and transforms it into another value.
+ * The cardinality consequently stays the same.
+ */
+ @Override
+ protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+ // we really cannot make any estimates here
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
new file mode 100644
index 0000000..5392339
--- /dev/null
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.compiler.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.compiler.dag.SingleInputNode;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class MapPartitionDescriptor extends OperatorDescriptorSingle {
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.MAP_PARTITION;
+ }
+
+ @Override
+ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ return new SingleInputPlanNode(node, "MapPartition ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP_PARTITION);
+ }
+
+ @Override
+ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+ return Collections.singletonList(new RequestedGlobalProperties());
+ }
+
+ @Override
+ protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+ return Collections.singletonList(new RequestedLocalProperties());
+ }
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+ return gProps;
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties lProps) {
+ return lProps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
index 0c32eae..7809599 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.util.Collector;
import java.io.Serializable;
/**
- * interface flatMap functions. FlatMap functions take elements and transform them,
+ * Base interface for flatMap functions. FlatMap functions take elements and transform them,
* into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
* and arrays. Operations that produce multiple strictly one result element per input element can also
* use the {@link MapFunction}.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
new file mode 100644
index 0000000..fbd5c9e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapPartitionFunction.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * Interface for "mapPartition" functions. A "mapPartition" function is called a single time per
+ * data partition receives an Iterable with data elements of that partition. It may return an
+ * arbitrary number of data elements.
+ * <p>
+ * This function is intended to provide enhanced flexibility in the processing of elements in a partition.
+ * For most of the simple use cases, consider using the {@link MapFunction} or {@link FlatMapFunction}.
+ * <p>
+ * The basic syntax for a MapPartitionFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ *
+ * DataSet<Y> result = input.mapPartition(new MyMapPartitionFunction());
+ * </blockquote></pre>
+ *
+ * @param <T> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+public interface MapPartitionFunction<T, O> extends Function, Serializable {
+
+ /**
+ * A user-implemented function that modifies or transforms an incoming object.
+ *
+ * @param records All records for the mapper
+ * @param out The collector to hand results to.
+ * @throws Exception
+ */
+ void mapPartition(Iterable<T> records, Collector<O> out) throws Exception;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
new file mode 100644
index 0000000..ae577fd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+
+/**
+ *
+ * @param <IN> The input type.
+ * @param <OUT> The result type.
+ * @param <FT> The type of the user-defined function.
+ */
+public class MapPartitionOperatorBase<IN, OUT, FT extends MapPartitionFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+
+ public MapPartitionOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+ super(udf, operatorInfo, name);
+ }
+
+ public MapPartitionOperatorBase(FT udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+ super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, name);
+ }
+
+ public MapPartitionOperatorBase(Class<? extends FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
+ super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 2b15007..d8450b5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.io.FileOutputFormat;
@@ -37,7 +38,6 @@ import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets;
import org.apache.flink.api.java.operators.CrossOperator;
-import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.operators.DistinctOperator;
@@ -47,6 +47,7 @@ import org.apache.flink.api.java.operators.JoinOperator.JoinHint;
import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.operators.ProjectOperator.Projection;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.ReduceOperator;
@@ -144,7 +145,7 @@ public abstract class DataSet<T> {
/**
- * Applies a Map operation to the entire partition of the data.
+ * Applies a Map-style operation to the entire partition of the data.
* The function is called once per parallel partition of the data,
* and the entire partition is available through the given Iterator.
* The number of elements that each instance of the MapPartition function
@@ -583,19 +584,19 @@ public abstract class DataSet<T> {
* both DataSets, i.e., it builds a Cartesian product.
*
* <p>
- * The resulting {@link DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with
+ * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with
* the element of the first input being the first field of the tuple and the element of the
* second input being the second field of the tuple.
*
* <p>
- * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(CrossFunction)} to define a {@link CrossFunction} which is called for
+ * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a {@link CrossFunction} which is called for
* each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
*
* @param other The other DataSet with which this DataSet is crossed.
* @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
*
- * @see DefaultCross
- * @see CrossFunction
+ * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross
+ * @see org.apache.flink.api.common.functions.CrossFunction
* @see DataSet
* @see Tuple2
*/
@@ -612,19 +613,19 @@ public abstract class DataSet<T> {
* smaller than the first one.
*
* <p>
- * The resulting {@link DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with
+ * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with
* the element of the first input being the first field of the tuple and the element of the
* second input being the second field of the tuple.
*
* <p>
- * Call {@link DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
+ * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
* {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
* each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
*
* @param other The other DataSet with which this DataSet is crossed.
* @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
*
- * @see DefaultCross
+ * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross
* @see org.apache.flink.api.common.functions.CrossFunction
* @see DataSet
* @see Tuple2
@@ -642,19 +643,19 @@ public abstract class DataSet<T> {
* larger than the first one.
*
* <p>
- * The resulting {@link DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with
+ * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with
* the element of the first input being the first field of the tuple and the element of the
* second input being the second field of the tuple.
*
* <p>
- * Call {@link DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
+ * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
* {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
* each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
*
* @param other The other DataSet with which this DataSet is crossed.
* @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
*
- * @see DefaultCross
+ * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross
* @see org.apache.flink.api.common.functions.CrossFunction
* @see DataSet
* @see Tuple2
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapPartitionFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapPartitionFunction.java
new file mode 100644
index 0000000..04176a2
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapPartitionFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link MapPartitionFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+public abstract class RichMapPartitionFunction<I, O> extends AbstractRichFunction implements MapPartitionFunction<I, O> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public abstract void mapPartition(Iterable<I> records, Collector<O> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
new file mode 100644
index 0000000..caf55f9
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.DataSet;
+
+/**
+ * This operator represents the application of a "mapPartition" function on a data set, and the
+ * result data set produced by the function.
+ *
+ * @param <IN> The type of the data set consumed by the operator.
+ * @param <OUT> The type of the data set created by the operator.
+ *
+ * @see MapPartitionFunction
+ */
+public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapPartitionOperator<IN, OUT>> {
+
+ protected final MapPartitionFunction<IN, OUT> function;
+
+
+ public MapPartitionOperator(DataSet<IN> input, MapPartitionFunction<IN, OUT> function) {
+ super(input, TypeExtractor.getMapPartitionReturnTypes(function, input.getType()));
+
+ this.function = function;
+ extractSemanticAnnotationsFromUdf(function.getClass());
+ }
+
+ @Override
+ protected MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
+
+ String name = getName() != null ? getName() : function.getClass().getName();
+ // create operator
+ MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
+ // set input
+ po.setInput(input);
+ // set dop
+ if(this.getParallelism() > 0) {
+ // use specified dop
+ po.setDegreeOfParallelism(this.getParallelism());
+ } else {
+ // if no dop has been specified, use dop of input operator to enable chaining
+ po.setDegreeOfParallelism(input.getDegreeOfParallelism());
+ }
+
+ return po;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
new file mode 100644
index 0000000..4eb049d
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+
+/**
+ * The MapPartitionFunction must be extended to provide a map partition implementation
+ * By definition, the map partition is called for a full input set.
+ */
+public abstract class MapPartitionFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.MapPartitionFunction<Record, Record> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * This method must be implemented to provide a user implementation of a MapPartitionFunction.
+ * It is called for a full input set.
+ *
+ * @param records all input records
+ * @param out A collector that collects all output records.
+ *
+ * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
+ * runtime catches an exception, it aborts the map task and lets the fail-over logic
+ * decide whether to retry the mapper execution.
+ */
+ @Override
+ public abstract void mapPartition(Iterable<Record> records, Collector<Record> out) throws Exception;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java
new file mode 100644
index 0000000..6d4f991
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/MapPartitionOperator.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.record.operators;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.RecordOperator;
+import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation;
+import org.apache.flink.api.java.record.functions.MapPartitionFunction;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+
+/**
+ * MapPartitionOperator that applies a {@link MapPartitionFunction} to each record independently.
+ *
+ * @see MapPartitionFunction
+ */
+public class MapPartitionOperator extends MapPartitionOperatorBase<Record, Record, MapPartitionFunction> implements RecordOperator {
+
+ private static String DEFAULT_NAME = "<Unnamed MapPartition>";
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+ *
+ * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+ */
+ public static Builder builder(MapPartitionFunction udf) {
+ return new Builder(new UserCodeObjectWrapper<MapPartitionFunction>(udf));
+ }
+
+ /**
+ * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+ *
+ * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+ */
+ public static Builder builder(Class<? extends MapPartitionFunction> udf) {
+ return new Builder(new UserCodeClassWrapper<MapPartitionFunction>(udf));
+ }
+
+ /**
+ * The private constructor that only gets invoked from the Builder.
+ * @param builder
+ */
+ protected MapPartitionOperator(Builder builder) {
+
+ super(builder.udf, OperatorInfoHelper.unary(), builder.name);
+
+ if (builder.inputs != null && !builder.inputs.isEmpty()) {
+ setInput(Operator.createUnionCascade(builder.inputs));
+ }
+
+ setBroadcastVariables(builder.broadcastInputs);
+ setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf));
+ }
+
+
+ @Override
+ public Class<? extends Key<?>>[] getKeyClasses() {
+ return emptyClassArray();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
+ */
+ public static class Builder {
+
+ /* The required parameters */
+ private final UserCodeWrapper<MapPartitionFunction> udf;
+
+ /* The optional parameters */
+ private List<Operator<Record>> inputs;
+ private Map<String, Operator<Record>> broadcastInputs;
+ private String name = DEFAULT_NAME;
+
+ /**
+ * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
+ *
+ * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
+ */
+ private Builder(UserCodeWrapper<MapPartitionFunction> udf) {
+ this.udf = udf;
+ this.inputs = new ArrayList<Operator<Record>>();
+ this.broadcastInputs = new HashMap<String, Operator<Record>>();
+ }
+
+ /**
+ * Sets the input.
+ *
+ * @param input The input.
+ */
+ public Builder input(Operator<Record> input) {
+ Validate.notNull(input, "The input must not be null");
+
+ this.inputs.clear();
+ this.inputs.add(input);
+ return this;
+ }
+
+ /**
+ * Sets one or several inputs (union).
+ *
+ * @param inputs
+ */
+ public Builder input(Operator<Record>...inputs) {
+ this.inputs.clear();
+ for (Operator<Record> c : inputs) {
+ this.inputs.add(c);
+ }
+ return this;
+ }
+
+ /**
+ * Sets the inputs.
+ *
+ * @param inputs
+ */
+ public Builder inputs(List<Operator<Record>> inputs) {
+ this.inputs = inputs;
+ return this;
+ }
+
+ /**
+ * Binds the result produced by a plan rooted at {@code root} to a
+ * variable used by the UDF wrapped in this operator.
+ */
+ public Builder setBroadcastVariable(String name, Operator<Record> input) {
+ this.broadcastInputs.put(name, input);
+ return this;
+ }
+
+ /**
+ * Binds multiple broadcast variables.
+ */
+ public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
+ this.broadcastInputs.clear();
+ this.broadcastInputs.putAll(inputs);
+ return this;
+ }
+
+ /**
+ * Sets the name of this operator.
+ *
+ * @param name
+ */
+ public Builder name(String name) {
+ this.name = name;
+ return this;
+ }
+
+ /**
+ * Creates and returns a MapOperator from using the values given
+ * to the builder.
+ *
+ * @return The created operator
+ */
+ public MapPartitionOperator build() {
+ if (name == null) {
+ name = udf.getUserCodeClass().getName();
+ }
+ return new MapPartitionOperator(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index b596a87..a8a833f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.functions.InvalidTypesException;
@@ -81,6 +82,15 @@ public class TypeExtractor {
}
@SuppressWarnings("unchecked")
+ public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapInterface, TypeInformation<IN> inType) {
+ validateInputType(MapPartitionFunction.class, mapInterface.getClass(), 0, inType);
+ if(mapInterface instanceof ResultTypeQueryable) {
+ return ((ResultTypeQueryable<OUT>) mapInterface).getProducedType();
+ }
+ return new TypeExtractor().privateCreateTypeInfo(MapPartitionFunction.class, mapInterface.getClass(), 1, inType, null);
+ }
+
+ @SuppressWarnings("unchecked")
public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface,
TypeInformation<IN> inType) {
validateInputType(GroupReduceFunction.class, groupReduceInterface.getClass(), 0, inType);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
new file mode 100644
index 0000000..260bae5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.runtime.util.MutableToRegularIteratorWrapper;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * MapPartition task which is executed by a Nephele task manager. The task has a single
+ * input and one or multiple outputs. It is provided with a MapFunction
+ * implementation.
+ * <p>
+ * The MapPartitionTask creates an iterator over all key-value pairs of its input and hands that to the <code>map_partition()</code> method
+ * of the MapFunction.
+ *
+ * @see MapPartitionFunction
+ *
+ * @param <IT> The mapper's input data type.
+ * @param <OT> The mapper's output data type.
+ */
+public class MapPartitionDriver<IT, OT> implements PactDriver<MapPartitionFunction<IT, OT>, OT> {
+
+ private PactTaskContext<MapPartitionFunction<IT, OT>, OT> taskContext;
+
+ @Override
+ public void setup(PactTaskContext<MapPartitionFunction<IT, OT>, OT> context) {
+ this.taskContext = context;
+ }
+
+ @Override
+ public int getNumberOfInputs() {
+ return 1;
+ }
+
+ @Override
+ public Class<MapPartitionFunction<IT, OT>> getStubType() {
+ @SuppressWarnings("unchecked")
+ final Class<MapPartitionFunction<IT, OT>> clazz = (Class<MapPartitionFunction<IT, OT>>) (Class<?>) MapPartitionFunction.class;
+ return clazz;
+ }
+
+ @Override
+ public boolean requiresComparatorOnInput() {
+ return false;
+ }
+
+ @Override
+ public void prepare() {
+ // nothing, since a mapper does not need any preparation
+ }
+
+ @Override
+ public void run() throws Exception {
+ // cache references on the stack
+ final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
+ final MapPartitionFunction<IT, OT> function = this.taskContext.getStub();
+ final Collector<OT> output = this.taskContext.getOutputCollector();
+
+ final MutableToRegularIteratorWrapper<IT> inIter = new MutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer());
+ function.mapPartition(inIter, output);
+ }
+
+ @Override
+ public void cleanup() {
+ // mappers need no cleanup, since no strategies are used.
+ }
+
+ @Override
+ public void cancel() {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 06df6a1..5ba2306 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -32,6 +32,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
@@ -293,6 +294,17 @@ public abstract class AbstractTestBase {
}
}
+ public static <X> void compareResultCollections(List<X> expected, List<X> actual, Comparator<X> comparator) {
+ Assert.assertEquals(expected.size(), actual.size());
+
+ Collections.sort(expected, comparator);
+ Collections.sort(actual, comparator);
+
+ for (int i = 0; i < expected.size(); i++) {
+ Assert.assertEquals(expected.get(i), actual.get(i));
+ }
+ }
+
private File[] getAllInvolvedFiles(String resultPath) {
File result = asFile(resultPath);
if (!result.exists()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 9307761..689c424 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -19,6 +19,8 @@
package org.apache.flink.test.util;
+import java.util.Comparator;
+
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.client.minicluster.NepheleMiniCluster;
@@ -33,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.junit.Assert;
import org.junit.Test;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
public abstract class JavaProgramTestBase extends AbstractTestBase {
@@ -179,4 +182,36 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
initializeContextEnvironment(this);
}
}
+
+ public static class TupleComparator<T extends Tuple> implements Comparator<T> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compare(T o1, T o2) {
+ int a1 = o1.getArity();
+ int a2 = o2.getArity();
+
+ if (a1 < a2) {
+ return -1;
+ } else if (a2 < a1) {
+ return 1;
+ } else {
+ for (int i = 0; i < a1; i++) {
+ Object obj1 = o1.getField(i);
+ Object obj2 = o2.getField(i);
+
+ if (!(obj1 instanceof Comparable && obj2 instanceof Comparable)) {
+ Assert.fail("Cannot compare tuple fields");
+ }
+
+ int cmp = ((Comparable<Object>) obj1).compareTo((Comparable<Object>) obj2);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ return 0;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
new file mode 100644
index 0000000..55b9544
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("serial")
+public class MapPartitionITCase extends JavaProgramTestBase {
+
+ private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n"
+ + "1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n"
+ + "5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
+
+ private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
+
+
+ private List<Tuple2<String, String>> input = new ArrayList<Tuple2<String,String>>();
+
+ private List<Tuple2<String, Integer>> expected = new ArrayList<Tuple2<String,Integer>>();
+
+ private List<Tuple2<String, Integer>> result = new ArrayList<Tuple2<String,Integer>>();
+
+
+ @Override
+ protected void preSubmit() throws Exception {
+
+ // create input
+ for (String s :IN.split("\n")) {
+ String[] fields = s.split(" ");
+ input.add(new Tuple2<String, String>(fields[0], fields[1]));
+ }
+
+ // create expected
+ for (String s : RESULT.split("\n")) {
+ String[] fields = s.split(" ");
+ expected.add(new Tuple2<String, Integer>(fields[0], Integer.parseInt(fields[1])));
+ }
+
+ }
+
+ @Override
+ protected void postSubmit() {
+ compareResultCollections(expected, result, new TupleComparator<Tuple2<String, Integer>>());
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<String, String>> data = env.fromCollection(input);
+
+ data.mapPartition(new TestMapPartition()).output(new LocalCollectionOutputFormat<Tuple2<String,Integer>>(result));
+
+ env.execute();
+ }
+
+
+ public static class TestMapPartition implements MapPartitionFunction<Tuple2<String, String>, Tuple2<String, Integer>> {
+
+ @Override
+ public void mapPartition(Iterable<Tuple2<String, String>> records, Collector<Tuple2<String, Integer>> out) {
+ for (Tuple2<String, String> record : records) {
+ String keyString = record.f0;
+ String valueString = record.f1;
+
+ int keyInt = Integer.parseInt(keyString);
+ int valueInt = Integer.parseInt(valueString);
+
+ if (keyInt + valueInt < 10) {
+ out.collect(new Tuple2<String, Integer>(valueString, keyInt + 10));
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
deleted file mode 100644
index 0d5e4ad..0000000
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.compiler.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import eu.stratosphere.api.common.operators.SingleInputOperator;
-import eu.stratosphere.compiler.DataStatistics;
-import eu.stratosphere.compiler.operators.MapPartitionDescriptor;
-import eu.stratosphere.compiler.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>MapPartition</i> operator node.
- */
-public class MapPartitionNode extends SingleInputNode {
-
- /**
- * Creates a new MapNode for the given contract.
- *
- * @param operator The map partition contract object.
- */
- public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
- super(operator);
- }
-
- @Override
- public String getName() {
- return "MapPartition";
- }
-
- @Override
- protected List<OperatorDescriptorSingle> getPossibleProperties() {
- return Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
- }
-
- /**
- * Computes the estimates for the MapPartition operator.
- * We assume that by default, Map takes one value and transforms it into another value.
- * The cardinality consequently stays the same.
- */
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
deleted file mode 100644
index 41b707d..0000000
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.compiler.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import eu.stratosphere.compiler.dag.SingleInputNode;
-import eu.stratosphere.compiler.dataproperties.GlobalProperties;
-import eu.stratosphere.compiler.dataproperties.LocalProperties;
-import eu.stratosphere.compiler.dataproperties.RequestedGlobalProperties;
-import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
-import eu.stratosphere.compiler.plan.Channel;
-import eu.stratosphere.compiler.plan.SingleInputPlanNode;
-import eu.stratosphere.pact.runtime.task.DriverStrategy;
-
-
-public class MapPartitionDescriptor extends OperatorDescriptorSingle {
-
- @Override
- public DriverStrategy getStrategy() {
- return DriverStrategy.MAP_PARTITION;
- }
-
- @Override
- public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "MapPartition ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP_PARTITION);
- }
-
- @Override
- protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
- return Collections.singletonList(new RequestedGlobalProperties());
- }
-
- @Override
- protected List<RequestedLocalProperties> createPossibleLocalProperties() {
- return Collections.singletonList(new RequestedLocalProperties());
- }
-
- @Override
- public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
- return gProps;
- }
-
- @Override
- public LocalProperties computeLocalProperties(LocalProperties lProps) {
- return lProps;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
deleted file mode 100644
index accf731..0000000
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package eu.stratosphere.api.common.functions;
-
-import eu.stratosphere.util.Collector;
-
-import java.util.Iterator;
-
-public interface GenericMapPartition<T, O> extends Function {
- /**
- * A user-implemented function that modifies or transforms an incoming object.
- *
- * @param records All records for the mapper
- * @param out The collector to hand results to.
- * @throws Exception
- */
- void mapPartition(Iterator<T> records, Collector<O> out) throws Exception;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
deleted file mode 100644
index 9e3552a..0000000
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.api.common.operators.base;
-
-import eu.stratosphere.api.common.functions.GenericMapPartition;
-import eu.stratosphere.api.common.operators.SingleInputOperator;
-import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
-import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
-import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
-import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
-
-
-/**
- *
- * @param <IN> The input type.
- * @param <OUT> The result type.
- * @param <FT> The type of the user-defined function.
- */
-public class MapPartitionOperatorBase<IN, OUT, FT extends GenericMapPartition<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
-
- public MapPartitionOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
- super(udf, operatorInfo, name);
- }
-
- public MapPartitionOperatorBase(FT udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
- super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, name);
- }
-
- public MapPartitionOperatorBase(Class<? extends FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
- super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
deleted file mode 100644
index 4c0155f..0000000
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.api.java.functions;
-
-
-import eu.stratosphere.api.common.functions.AbstractFunction;
-import eu.stratosphere.api.common.functions.GenericMapPartition;
-import eu.stratosphere.util.Collector;
-
-import java.util.Iterator;
-
-public abstract class MapPartitionFunction<IN, OUT> extends AbstractFunction implements GenericMapPartition<IN, OUT> {
-
- private static final long serialVersionUID = 1L;
- /**
- *
- * @param records All records for the mapper
- * @param out The collector to hand results to.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
- @Override
- public abstract void mapPartition(Iterator<IN> records, Collector<OUT> out) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
deleted file mode 100644
index 836b205..0000000
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-package eu.stratosphere.api.java.operators;
-
-import eu.stratosphere.api.common.functions.GenericMapPartition;
-import eu.stratosphere.api.common.operators.Operator;
-import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
-import eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase;
-import eu.stratosphere.api.java.DataSet;
-import eu.stratosphere.api.java.functions.MapFunction;
-import eu.stratosphere.api.java.functions.MapPartitionFunction;
-import eu.stratosphere.api.java.typeutils.TypeExtractor;
-
-/**
- * This operator represents the application of a "mapPartition" function on a data set, and the
- * result data set produced by the function.
- *
- * @param <IN> The type of the data set consumed by the operator.
- * @param <OUT> The type of the data set created by the operator.
- *
- * @see MapFunction
- */
-public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapPartitionOperator<IN, OUT>> {
-
- protected final MapPartitionFunction<IN, OUT> function;
-
-
- public MapPartitionOperator(DataSet<IN> input, MapPartitionFunction<IN, OUT> function) {
- super(input, TypeExtractor.getMapPartitionReturnTypes(function, input.getType()));
-
- this.function = function;
- extractSemanticAnnotationsFromUdf(function.getClass());
- }
-
- @Override
- protected eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>> translateToDataFlow(Operator<IN> input) {
-
- String name = getName() != null ? getName() : function.getClass().getName();
- // create operator
- MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, GenericMapPartition<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
- // set input
- po.setInput(input);
- // set dop
- if(this.getParallelism() > 0) {
- // use specified dop
- po.setDegreeOfParallelism(this.getParallelism());
- } else {
- // if no dop has been specified, use dop of input operator to enable chaining
- po.setDegreeOfParallelism(input.getDegreeOfParallelism());
- }
-
- return po;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
deleted file mode 100644
index 5284d06..0000000
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.api.java.record.functions;
-
-import eu.stratosphere.api.common.functions.AbstractFunction;
-import eu.stratosphere.api.common.functions.GenericMapPartition;
-import eu.stratosphere.types.Record;
-import eu.stratosphere.util.Collector;
-
-import java.util.Iterator;
-
-/**
- * The MapPartitionFunction must be extended to provide a map partition implementation
- * By definition, the map partition is called for a full input set.
- */
-public abstract class MapPartitionFunction extends AbstractFunction implements GenericMapPartition<Record, Record> {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * This method must be implemented to provide a user implementation of a mappartitioner.
- * It is called for a full input set.
- *
- * @param records all input records
- * @param out A collector that collects all output records.
- *
- * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
- * runtime catches an exception, it aborts the map task and lets the fail-over logic
- * decide whether to retry the mapper execution.
- */
- @Override
- public abstract void mapPartition(Iterator<Record> records, Collector<Record> out) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
deleted file mode 100644
index 1ac132c..0000000
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.api.java.record.operators;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase;
-import eu.stratosphere.api.java.record.functions.MapPartitionFunction;
-import org.apache.commons.lang3.Validate;
-
-
-
-import eu.stratosphere.api.common.operators.Operator;
-import eu.stratosphere.api.common.operators.RecordOperator;
-import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
-import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
-import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
-import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
-import eu.stratosphere.types.Key;
-import eu.stratosphere.types.Record;
-
-/**
- * MapPartitionOperator that applies a {@link MapPartitionFunction} to each record independently.
- *
- * @see MapPartitionFunction
- */
-public class MapPartitionOperator extends MapPartitionOperatorBase<Record, Record, MapPartitionFunction> implements RecordOperator {
-
- private static String DEFAULT_NAME = "<Unnamed MapPartition>";
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
- *
- * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
- */
- public static Builder builder(MapPartitionFunction udf) {
- return new Builder(new UserCodeObjectWrapper<MapPartitionFunction>(udf));
- }
-
- /**
- * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
- *
- * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
- */
- public static Builder builder(Class<? extends MapPartitionFunction> udf) {
- return new Builder(new UserCodeClassWrapper<MapPartitionFunction>(udf));
- }
-
- /**
- * The private constructor that only gets invoked from the Builder.
- * @param builder
- */
- protected MapPartitionOperator(Builder builder) {
-
- super(builder.udf, OperatorInfoHelper.unary(), builder.name);
-
- if (builder.inputs != null && !builder.inputs.isEmpty()) {
- setInput(Operator.createUnionCascade(builder.inputs));
- }
-
- setBroadcastVariables(builder.broadcastInputs);
- setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf));
- }
-
-
- @Override
- public Class<? extends Key<?>>[] getKeyClasses() {
- return emptyClassArray();
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
- */
- public static class Builder {
-
- /* The required parameters */
- private final UserCodeWrapper<MapPartitionFunction> udf;
-
- /* The optional parameters */
- private List<Operator<Record>> inputs;
- private Map<String, Operator<Record>> broadcastInputs;
- private String name = DEFAULT_NAME;
-
- /**
- * Creates a Builder with the provided {@link MapPartitionFunction} implementation.
- *
- * @param udf The {@link MapPartitionFunction} implementation for this Map operator.
- */
- private Builder(UserCodeWrapper<MapPartitionFunction> udf) {
- this.udf = udf;
- this.inputs = new ArrayList<Operator<Record>>();
- this.broadcastInputs = new HashMap<String, Operator<Record>>();
- }
-
- /**
- * Sets the input.
- *
- * @param input The input.
- */
- public Builder input(Operator<Record> input) {
- Validate.notNull(input, "The input must not be null");
-
- this.inputs.clear();
- this.inputs.add(input);
- return this;
- }
-
- /**
- * Sets one or several inputs (union).
- *
- * @param inputs
- */
- public Builder input(Operator<Record>...inputs) {
- this.inputs.clear();
- for (Operator<Record> c : inputs) {
- this.inputs.add(c);
- }
- return this;
- }
-
- /**
- * Sets the inputs.
- *
- * @param inputs
- */
- public Builder inputs(List<Operator<Record>> inputs) {
- this.inputs = inputs;
- return this;
- }
-
- /**
- * Binds the result produced by a plan rooted at {@code root} to a
- * variable used by the UDF wrapped in this operator.
- */
- public Builder setBroadcastVariable(String name, Operator<Record> input) {
- this.broadcastInputs.put(name, input);
- return this;
- }
-
- /**
- * Binds multiple broadcast variables.
- */
- public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
- this.broadcastInputs.clear();
- this.broadcastInputs.putAll(inputs);
- return this;
- }
-
- /**
- * Sets the name of this operator.
- *
- * @param name
- */
- public Builder name(String name) {
- this.name = name;
- return this;
- }
-
- /**
- * Creates and returns a MapOperator from using the values given
- * to the builder.
- *
- * @return The created operator
- */
- public MapPartitionOperator build() {
- if (name == null) {
- name = udf.getUserCodeClass().getName();
- }
- return new MapPartitionOperator(this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
deleted file mode 100644
index 0ec5f15..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.pact.runtime.task;
-
-import eu.stratosphere.api.common.functions.GenericMapPartition;
-import eu.stratosphere.pact.runtime.util.MutableToRegularIteratorWrapper;
-import eu.stratosphere.util.Collector;
-import eu.stratosphere.util.MutableObjectIterator;
-
-
-/**
- * MapPartition task which is executed by a Nephele task manager. The task has a single
- * input and one or multiple outputs. It is provided with a MapFunction
- * implementation.
- * <p>
- * The MapPartitionTask creates an iterator over all key-value pairs of its input and hands that to the <code>map_partition()</code> method
- * of the MapFunction.
- *
- * @see GenericMapPartition
- *
- * @param <IT> The mapper's input data type.
- * @param <OT> The mapper's output data type.
- */
-public class MapPartitionDriver<IT, OT> implements PactDriver<GenericMapPartition<IT, OT>, OT> {
-
- private PactTaskContext<GenericMapPartition<IT, OT>, OT> taskContext;
-
- private volatile boolean running;
-
-
- @Override
- public void setup(PactTaskContext<GenericMapPartition<IT, OT>, OT> context) {
- this.taskContext = context;
- this.running = true;
- }
-
- @Override
- public int getNumberOfInputs() {
- return 1;
- }
-
- @Override
- public Class<GenericMapPartition<IT, OT>> getStubType() {
- @SuppressWarnings("unchecked")
- final Class<GenericMapPartition<IT, OT>> clazz = (Class<GenericMapPartition<IT, OT>>) (Class<?>) GenericMapPartition.class;
- return clazz;
- }
-
- @Override
- public boolean requiresComparatorOnInput() {
- return false;
- }
-
- @Override
- public void prepare() {
- // nothing, since a mapper does not need any preparation
- }
-
- @Override
- public void run() throws Exception {
- // cache references on the stack
- final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
- final GenericMapPartition<IT, OT> function = this.taskContext.getStub();
- final Collector<OT> output = this.taskContext.getOutputCollector();
-
- final MutableToRegularIteratorWrapper<IT> inIter = new MutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer() );
- IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
-
- function.mapPartition(inIter, output);
- }
-
- @Override
- public void cleanup() {
- // mappers need no cleanup, since no strategies are used.
- }
-
- @Override
- public void cancel() {
- this.running = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0cead7e/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
deleted file mode 100644
index 5da0f99..0000000
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.test.operators;
-
-import eu.stratosphere.api.common.Plan;
-import eu.stratosphere.api.java.record.operators.FileDataSink;
-import eu.stratosphere.api.java.record.operators.FileDataSource;
-import eu.stratosphere.api.java.record.functions.MapPartitionFunction;
-import eu.stratosphere.api.java.record.io.DelimitedInputFormat;
-import eu.stratosphere.api.java.record.operators.MapPartitionOperator;
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import eu.stratosphere.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import eu.stratosphere.test.util.RecordAPITestBase;
-import eu.stratosphere.types.IntValue;
-import eu.stratosphere.types.Record;
-import eu.stratosphere.types.StringValue;
-import eu.stratosphere.util.Collector;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-@RunWith(Parameterized.class)
-public class MapPartitionITCase extends RecordAPITestBase {
-
- private static final Log LOG = LogFactory.getLog(MapITCase.class);
-
- String inPath = null;
- String resultPath = null;
-
- public MapPartitionITCase(Configuration testConfig) {
- super(testConfig);
- }
-
- private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
- "1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n" +
- "5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
-
- private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-
- @Override
- protected void preSubmit() throws Exception {
- inPath = createTempFile("in.txt", IN);
- resultPath = getTempDirPath("result");
- }
-
- public static class TestMapPartition extends MapPartitionFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private StringValue keyString = new StringValue();
- private StringValue valueString = new StringValue();
-
-
- @Override
- public void mapPartition(Iterator<Record> records, Collector<Record> out) throws Exception {
- while(records.hasNext() ){
- Record record = records.next();
- keyString = record.getField(0, keyString);
- valueString = record.getField(1, valueString);
-
- LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]");
-
- if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) {
-
- record.setField(0, valueString);
- record.setField(1, new IntValue(Integer.parseInt(keyString.toString()) + 10));
-
- out.collect(record);
- }
- }
- }
- }
-
- @Override
- protected Plan getTestJob() {
- FileDataSource input = new FileDataSource(
- new ContractITCaseInputFormat(), inPath);
- DelimitedInputFormat.configureDelimitedFormat(input)
- .recordDelimiter('\n');
- input.setDegreeOfParallelism(config.getInteger("MapPartitionTest#NoSubtasks", 1));
-
- MapPartitionOperator testMapper = MapPartitionOperator.builder(new TestMapPartition()).build();
- testMapper.setDegreeOfParallelism(config.getInteger("TestMapPartition#NoSubtasks", 1));
-
- FileDataSink output = new FileDataSink(
- new ContractITCaseOutputFormat(), resultPath);
- output.setDegreeOfParallelism(1);
-
- output.setInput(testMapper);
- testMapper.setInput(input);
-
- return new Plan(output);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(RESULT, resultPath);
- }
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> testConfigs = new LinkedList<Configuration>();
-
- Configuration config = new Configuration();
- config.setInteger("MapPartitionTest#NoSubtasks", 4);
- testConfigs.add(config);
-
- return toParameterList(testConfigs);
- }
-}