You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/09 14:39:48 UTC
[17/39] [FLINK-701] Refactor Java API to use SAM interfaces.
Introduce RichFunction stubs for all UDFs.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index da0f222..0727d63 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.operators;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.util.MutableToRegularIteratorWrapper;
@@ -37,13 +37,13 @@ import org.apache.flink.util.MutableObjectIterator;
* The GroupReduceTask creates a iterator over all records from its input. The iterator returns all records grouped by their
* key. The iterator is handed to the <code>reduce()</code> method of the GroupReduceFunction.
*
- * @see GenericGroupReduce
+ * @see org.apache.flink.api.common.functions.GroupReduceFunction
*/
-public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce<IT, OT>, OT> {
+public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction<IT, OT>, OT> {
private static final Log LOG = LogFactory.getLog(AllGroupReduceDriver.class);
- private PactTaskContext<GenericGroupReduce<IT, OT>, OT> taskContext;
+ private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
private MutableObjectIterator<IT> input;
@@ -54,7 +54,7 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupRedu
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GenericGroupReduce<IT, OT>, OT> context) {
+ public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> context) {
this.taskContext = context;
}
@@ -64,9 +64,9 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupRedu
}
@Override
- public Class<GenericGroupReduce<IT, OT>> getStubType() {
+ public Class<GroupReduceFunction<IT, OT>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericGroupReduce<IT, OT>> clazz = (Class<GenericGroupReduce<IT, OT>>) (Class<?>) GenericGroupReduce.class;
+ final Class<GroupReduceFunction<IT, OT>> clazz = (Class<GroupReduceFunction<IT, OT>>) (Class<?>) GroupReduceFunction.class;
return clazz;
}
@@ -83,8 +83,8 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupRedu
this.strategy = config.getDriverStrategy();
if (strategy == DriverStrategy.ALL_GROUP_COMBINE) {
- if (!(this.taskContext.getStub() instanceof GenericCombine)) {
- throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + GenericCombine.class.getName());
+ if (!(this.taskContext.getStub() instanceof FlatCombineFunction)) {
+ throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + FlatCombineFunction.class.getName());
}
}
else if (strategy != DriverStrategy.ALL_GROUP_REDUCE) {
@@ -105,13 +105,13 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupRedu
// single UDF call with the single group
if (inIter.hasNext()) {
if (strategy == DriverStrategy.ALL_GROUP_REDUCE) {
- final GenericGroupReduce<IT, OT> reducer = this.taskContext.getStub();
+ final GroupReduceFunction<IT, OT> reducer = this.taskContext.getStub();
final Collector<OT> output = this.taskContext.getOutputCollector();
reducer.reduce(inIter, output);
}
else {
@SuppressWarnings("unchecked")
- final GenericCombine<IT> combiner = (GenericCombine<IT>) this.taskContext.getStub();
+ final FlatCombineFunction<IT> combiner = (FlatCombineFunction<IT>) this.taskContext.getStub();
@SuppressWarnings("unchecked")
final Collector<IT> output = (Collector<IT>) this.taskContext.getOutputCollector();
combiner.combine(inIter, output);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index 30bfae3..721f4f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -35,13 +35,13 @@ import org.apache.flink.util.MutableObjectIterator;
* The ReduceTask creates a iterator over all records from its input. The iterator returns all records grouped by their
* key. The iterator is handed to the <code>reduce()</code> method of the ReduceFunction.
*
- * @see GenericReduce
+ * @see org.apache.flink.api.common.functions.ReduceFunction
*/
-public class AllReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
+public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
private static final Log LOG = LogFactory.getLog(AllReduceDriver.class);
- private PactTaskContext<GenericReduce<T>, T> taskContext;
+ private PactTaskContext<ReduceFunction<T>, T> taskContext;
private MutableObjectIterator<T> input;
@@ -52,7 +52,7 @@ public class AllReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GenericReduce<T>, T> context) {
+ public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
this.taskContext = context;
this.running = true;
}
@@ -63,9 +63,9 @@ public class AllReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
}
@Override
- public Class<GenericReduce<T>> getStubType() {
+ public Class<ReduceFunction<T>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericReduce<T>> clazz = (Class<GenericReduce<T>>) (Class<?>) GenericReduce.class;
+ final Class<ReduceFunction<T>> clazz = (Class<ReduceFunction<T>>) (Class<?>) ReduceFunction.class;
return clazz;
}
@@ -94,7 +94,7 @@ public class AllReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
LOG.debug(this.taskContext.formatLogString("AllReduce preprocessing done. Running Reducer code."));
}
- final GenericReduce<T> stub = this.taskContext.getStub();
+ final ReduceFunction<T> stub = this.taskContext.getStub();
final MutableObjectIterator<T> input = this.input;
final TypeSerializer<T> serializer = this.serializer;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
index 3da451a..8ff0262 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -41,12 +41,12 @@ import org.apache.flink.util.MutableObjectIterator;
*
* @see org.apache.flink.api.java.record.functions.CoGroupFunction
*/
-public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<GenericCoGrouper<IT1, IT2, OT>, OT> {
+public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
private static final Log LOG = LogFactory.getLog(CoGroupDriver.class);
- private PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> taskContext;
+ private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
private CoGroupTaskIterator<IT1, IT2> coGroupIterator; // the iterator that does the actual cogroup
@@ -56,7 +56,7 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<GenericCoGrouper<
@Override
- public void setup(PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> context) {
+ public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -69,9 +69,9 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<GenericCoGrouper<
@Override
- public Class<GenericCoGrouper<IT1, IT2, OT>> getStubType() {
+ public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericCoGrouper<IT1, IT2, OT>> clazz = (Class<GenericCoGrouper<IT1, IT2, OT>>) (Class<?>) GenericCoGrouper.class;
+ final Class<CoGroupFunction<IT1, IT2, OT>> clazz = (Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class;
return clazz;
}
@@ -122,7 +122,7 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<GenericCoGrouper<
@Override
public void run() throws Exception
{
- final GenericCoGrouper<IT1, IT2, OT> coGroupStub = this.taskContext.getStub();
+ final CoGroupFunction<IT1, IT2, OT> coGroupStub = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
final CoGroupTaskIterator<IT1, IT2> coGroupIterator = this.coGroupIterator;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index 9d06618..8761a2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
import java.util.Iterator;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
@@ -34,9 +34,9 @@ import org.apache.flink.runtime.util.KeyGroupedIterator;
import org.apache.flink.runtime.util.SingleElementIterator;
import org.apache.flink.util.Collector;
-public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericCoGrouper<IT1, IT2, OT>, OT> {
+public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
- private PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> taskContext;
+ private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
private CompactingHashTable<IT1> hashTable;
@@ -53,7 +53,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
// --------------------------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> context) {
+ public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -64,9 +64,9 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
}
@Override
- public Class<GenericCoGrouper<IT1, IT2, OT>> getStubType() {
+ public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericCoGrouper<IT1, IT2, OT>> clazz = (Class<GenericCoGrouper<IT1, IT2, OT>>) (Class<?>) GenericCoGrouper.class;
+ final Class<CoGroupFunction<IT1, IT2, OT>> clazz = (Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class;
return clazz;
}
@@ -123,7 +123,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
@Override
public void run() throws Exception {
- final GenericCoGrouper<IT1, IT2, OT> coGroupStub = taskContext.getStub();
+ final CoGroupFunction<IT1, IT2, OT> coGroupStub = taskContext.getStub();
final Collector<OT> collector = taskContext.getOutputCollector();
IT1 buildSideRecord = solutionSideRecord;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index 80fa855..f2020c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
import java.util.Iterator;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
@@ -34,9 +34,9 @@ import org.apache.flink.runtime.util.KeyGroupedIterator;
import org.apache.flink.runtime.util.SingleElementIterator;
import org.apache.flink.util.Collector;
-public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericCoGrouper<IT1, IT2, OT>, OT> {
+public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
- private PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> taskContext;
+ private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
private CompactingHashTable<IT2> hashTable;
@@ -53,7 +53,7 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
// --------------------------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> context) {
+ public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -64,9 +64,9 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
}
@Override
- public Class<GenericCoGrouper<IT1, IT2, OT>> getStubType() {
+ public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericCoGrouper<IT1, IT2, OT>> clazz = (Class<GenericCoGrouper<IT1, IT2, OT>>) (Class<?>) GenericCoGrouper.class;
+ final Class<CoGroupFunction<IT1, IT2, OT>> clazz = (Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class;
return clazz;
}
@@ -123,7 +123,7 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
@Override
public void run() throws Exception {
- final GenericCoGrouper<IT1, IT2, OT> coGroupStub = taskContext.getStub();
+ final CoGroupFunction<IT1, IT2, OT> coGroupStub = taskContext.getStub();
final Collector<OT> collector = taskContext.getOutputCollector();
IT2 buildSideRecord = solutionSideRecord;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
index 7c311ed..b68f24d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCrosser;
+import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator;
@@ -40,12 +40,12 @@ import org.apache.flink.util.MutableObjectIterator;
*
* @see org.apache.flink.api.java.functions.CrossFunction
*/
-public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2, OT>, OT> {
+public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, T2, OT>, OT> {
private static final Log LOG = LogFactory.getLog(CrossDriver.class);
- private PactTaskContext<GenericCrosser<T1, T2, OT>, OT> taskContext;
+ private PactTaskContext<CrossFunction<T1, T2, OT>, OT> taskContext;
private MemoryManager memManager;
@@ -67,7 +67,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
@Override
- public void setup(PactTaskContext<GenericCrosser<T1, T2, OT>, OT> context) {
+ public void setup(PactTaskContext<CrossFunction<T1, T2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -80,9 +80,9 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
@Override
- public Class<GenericCrosser<T1, T2, OT>> getStubType() {
+ public Class<CrossFunction<T1, T2, OT>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericCrosser<T1, T2, OT>> clazz = (Class<GenericCrosser<T1, T2, OT>>) (Class<?>) GenericCrosser.class;
+ final Class<CrossFunction<T1, T2, OT>> clazz = (Class<CrossFunction<T1, T2, OT>>) (Class<?>) CrossFunction.class;
return clazz;
}
@@ -207,7 +207,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
final T2 val2Reuse = serializer2.createInstance();
T2 val2Copy = serializer2.createInstance();
- final GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub();
+ final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
// for all blocks
@@ -217,7 +217,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
// for all values in the block
while ((val1 = blockVals.next(val1Reuse)) != null) {
val2Copy = serializer2.copy(val2, val2Copy);
- crosser.cross(val1, val2Copy, collector);
+ collector.collect(crosser.cross(val1,val2Copy));
+ //crosser.cross(val1, val2Copy, collector);
}
blockVals.reset();
}
@@ -254,7 +255,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
T2 val2;
final T2 val2Reuse = serializer2.createInstance();
- final GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub();
+ final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
// for all blocks
@@ -264,7 +265,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
// for all values in the block
while (this.running && ((val2 = blockVals.next(val2Reuse)) != null)) {
val1Copy = serializer1.copy(val1, val1Copy);
- crosser.cross(val1Copy, val2, collector);
+ collector.collect(crosser.cross(val1Copy, val2));
+ //crosser.cross(val1Copy, val2, collector);
}
blockVals.reset();
}
@@ -296,7 +298,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
T2 val2;
final T2 val2Reuse = serializer2.createInstance();
- final GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub();
+ final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
// for all blocks
@@ -304,7 +306,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
// for all values from the spilling side
while (this.running && ((val2 = spillVals.next(val2Reuse)) != null)) {
val1Copy = serializer1.copy(val1, val1Copy);
- crosser.cross(val1Copy, val2, collector);
+ collector.collect(crosser.cross(val1Copy, val2));
+ //crosser.cross(val1Copy, val2, collector);
}
spillVals.reset();
}
@@ -332,7 +335,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
final T2 val2Reuse = serializer2.createInstance();
T2 val2Copy = serializer2.createInstance();
- final GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub();
+ final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
// for all blocks
@@ -340,7 +343,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
// for all values from the spilling side
while (this.running && (val1 = spillVals.next(val1Reuse)) != null) {
val2Copy = serializer2.copy(val2, val2Copy);
- crosser.cross(val1, val2Copy, collector);
+ collector.collect(crosser.cross(val1, val2Copy));
+ //crosser.cross(val1, val2Copy, collector);
}
spillVals.reset();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
index 44f22a0..130601b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.operators;
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
@@ -31,20 +31,20 @@ import org.apache.flink.util.MutableObjectIterator;
* The MapTask creates an iterator over all key-value pairs of its input and hands that to the <code>map()</code> method
* of the MapFunction.
*
- * @see GenericFlatMap
+ * @see org.apache.flink.api.common.functions.FlatMapFunction
*
* @param <IT> The mapper's input data type.
* @param <OT> The mapper's output data type.
*/
-public class FlatMapDriver<IT, OT> implements PactDriver<GenericFlatMap<IT, OT>, OT> {
+public class FlatMapDriver<IT, OT> implements PactDriver<FlatMapFunction<IT, OT>, OT> {
- private PactTaskContext<GenericFlatMap<IT, OT>, OT> taskContext;
+ private PactTaskContext<FlatMapFunction<IT, OT>, OT> taskContext;
private volatile boolean running;
@Override
- public void setup(PactTaskContext<GenericFlatMap<IT, OT>, OT> context) {
+ public void setup(PactTaskContext<FlatMapFunction<IT, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -55,9 +55,9 @@ public class FlatMapDriver<IT, OT> implements PactDriver<GenericFlatMap<IT, OT>,
}
@Override
- public Class<GenericFlatMap<IT, OT>> getStubType() {
+ public Class<FlatMapFunction<IT, OT>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericFlatMap<IT, OT>> clazz = (Class<GenericFlatMap<IT, OT>>) (Class<?>) GenericFlatMap.class;
+ final Class<FlatMapFunction<IT, OT>> clazz = (Class<FlatMapFunction<IT, OT>>) (Class<?>) FlatMapFunction.class;
return clazz;
}
@@ -75,7 +75,7 @@ public class FlatMapDriver<IT, OT> implements PactDriver<GenericFlatMap<IT, OT>,
public void run() throws Exception {
// cache references on the stack
final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
- final GenericFlatMap<IT, OT> function = this.taskContext.getStub();
+ final FlatMapFunction<IT, OT> function = this.taskContext.getStub();
final Collector<OT> output = this.taskContext.getOutputCollector();
IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 1d0749c..f786c56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCombine;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -39,12 +39,12 @@ import org.apache.flink.util.MutableObjectIterator;
*
* @param <T> The data type consumed and produced by the combiner.
*/
-public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T>, T> {
+public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFunction<T>, T> {
private static final Log LOG = LogFactory.getLog(GroupReduceCombineDriver.class);
- private PactTaskContext<GenericCombine<T>, T> taskContext;
+ private PactTaskContext<FlatCombineFunction<T>, T> taskContext;
private CloseableInputProvider<T> input;
@@ -57,7 +57,7 @@ public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T>
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GenericCombine<T>, T> context) {
+ public void setup(PactTaskContext<FlatCombineFunction<T>, T> context) {
this.taskContext = context;
this.running = true;
}
@@ -68,9 +68,9 @@ public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T>
}
@Override
- public Class<GenericCombine<T>> getStubType() {
+ public Class<FlatCombineFunction<T>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericCombine<T>> clazz = (Class<GenericCombine<T>>) (Class<?>) GenericCombine.class;
+ final Class<FlatCombineFunction<T>> clazz = (Class<FlatCombineFunction<T>>) (Class<?>) FlatCombineFunction.class;
return clazz;
}
@@ -111,7 +111,7 @@ public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T>
this.serializerFactory.getSerializer(), this.comparator);
// cache references on the stack
- final GenericCombine<T> stub = this.taskContext.getStub();
+ final FlatCombineFunction<T> stub = this.taskContext.getStub();
final Collector<T> output = this.taskContext.getOutputCollector();
// run stub implementation
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index 1ab080c..960143d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -37,13 +37,13 @@ import org.apache.flink.util.MutableObjectIterator;
* The GroupReduceTask creates a iterator over all records from its input. The iterator returns all records grouped by their
* key. The iterator is handed to the <code>reduce()</code> method of the GroupReduceFunction.
*
- * @see GenericGroupReduce
+ * @see org.apache.flink.api.common.functions.GroupReduceFunction
*/
-public class GroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce<IT, OT>, OT> {
+public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction<IT, OT>, OT> {
private static final Log LOG = LogFactory.getLog(GroupReduceDriver.class);
- private PactTaskContext<GenericGroupReduce<IT, OT>, OT> taskContext;
+ private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
private MutableObjectIterator<IT> input;
@@ -56,7 +56,7 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce<
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GenericGroupReduce<IT, OT>, OT> context) {
+ public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -67,9 +67,9 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce<
}
@Override
- public Class<GenericGroupReduce<IT, OT>> getStubType() {
+ public Class<GroupReduceFunction<IT, OT>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericGroupReduce<IT, OT>> clazz = (Class<GenericGroupReduce<IT, OT>>) (Class<?>) GenericGroupReduce.class;
+ final Class<GroupReduceFunction<IT, OT>> clazz = (Class<GroupReduceFunction<IT, OT>>) (Class<?>) GroupReduceFunction.class;
return clazz;
}
@@ -100,7 +100,7 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce<
final KeyGroupedIterator<IT> iter = new KeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);
// cache references on the stack
- final GenericGroupReduce<IT, OT> stub = this.taskContext.getStub();
+ final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub();
final Collector<OT> output = this.taskContext.getOutputCollector();
// run stub implementation
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
index b23b0cd..342f307 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.operators;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypePairComparator;
@@ -32,9 +32,9 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
-public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericJoiner<IT1, IT2, OT>, OT> {
+public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
- private PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> taskContext;
+ private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
private CompactingHashTable<IT1> hashTable;
@@ -50,7 +50,7 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
// --------------------------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> context) {
+ public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -61,9 +61,9 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
}
@Override
- public Class<GenericJoiner<IT1, IT2, OT>> getStubType() {
+ public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericJoiner<IT1, IT2, OT>> clazz = (Class<GenericJoiner<IT1, IT2, OT>>) (Class<?>) GenericJoiner.class;
+ final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = (Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class;
return clazz;
}
@@ -126,7 +126,7 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
@Override
public void run() throws Exception {
- final GenericJoiner<IT1, IT2, OT> joinFunction = taskContext.getStub();
+ final FlatJoinFunction<IT1, IT2, OT> joinFunction = taskContext.getStub();
final Collector<OT> collector = taskContext.getOutputCollector();
IT1 buildSideRecord = this.solutionSideRecord;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
index 4fa5c5a..c38a81a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.operators;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypePairComparator;
@@ -32,9 +32,9 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
-public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericJoiner<IT1, IT2, OT>, OT> {
+public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
- private PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> taskContext;
+ private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
private CompactingHashTable<IT2> hashTable;
@@ -50,7 +50,7 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
// --------------------------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> context) {
+ public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -61,9 +61,9 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
}
@Override
- public Class<GenericJoiner<IT1, IT2, OT>> getStubType() {
+ public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericJoiner<IT1, IT2, OT>> clazz = (Class<GenericJoiner<IT1, IT2, OT>>) (Class<?>) GenericJoiner.class;
+ final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = (Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class;
return clazz;
}
@@ -126,7 +126,7 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
@Override
public void run() throws Exception {
- final GenericJoiner<IT1, IT2, OT> joinFunction = taskContext.getStub();
+ final FlatJoinFunction<IT1, IT2, OT> joinFunction = taskContext.getStub();
final Collector<OT> collector = taskContext.getOutputCollector();
IT2 buildSideRecord = this.solutionSideRecord;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
index fe1e0c1..89fbf4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.operators;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
@@ -31,20 +31,20 @@ import org.apache.flink.util.MutableObjectIterator;
* The MapTask creates an iterator over all key-value pairs of its input and hands that to the <code>map()</code> method
* of the MapFunction.
*
- * @see GenericMap
+ * @see org.apache.flink.api.common.functions.MapFunction
*
* @param <IT> The mapper's input data type.
* @param <OT> The mapper's output data type.
*/
-public class MapDriver<IT, OT> implements PactDriver<GenericMap<IT, OT>, OT> {
+public class MapDriver<IT, OT> implements PactDriver<MapFunction<IT, OT>, OT> {
- private PactTaskContext<GenericMap<IT, OT>, OT> taskContext;
+ private PactTaskContext<MapFunction<IT, OT>, OT> taskContext;
private volatile boolean running;
@Override
- public void setup(PactTaskContext<GenericMap<IT, OT>, OT> context) {
+ public void setup(PactTaskContext<MapFunction<IT, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -55,9 +55,9 @@ public class MapDriver<IT, OT> implements PactDriver<GenericMap<IT, OT>, OT> {
}
@Override
- public Class<GenericMap<IT, OT>> getStubType() {
+ public Class<MapFunction<IT, OT>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericMap<IT, OT>> clazz = (Class<GenericMap<IT, OT>>) (Class<?>) GenericMap.class;
+ final Class<MapFunction<IT, OT>> clazz = (Class<MapFunction<IT, OT>>) (Class<?>) MapFunction.class;
return clazz;
}
@@ -75,7 +75,7 @@ public class MapDriver<IT, OT> implements PactDriver<GenericMap<IT, OT>, OT> {
public void run() throws Exception {
// cache references on the stack
final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
- final GenericMap<IT, OT> function = this.taskContext.getStub();
+ final MapFunction<IT, OT> function = this.taskContext.getStub();
final Collector<OT> output = this.taskContext.getOutputCollector();
IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
index a29afa2..e205f1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -42,13 +42,13 @@ import org.apache.flink.util.MutableObjectIterator;
* The MatchTask matches all pairs of records that share the same key and come from different inputs. Each pair of
* matching records is handed to the <code>match()</code> method of the JoinFunction.
*
- * @see GenericJoiner
+ * @see org.apache.flink.api.common.functions.FlatJoinFunction
*/
-public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1, IT2, OT>, OT> {
+public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
protected static final Log LOG = LogFactory.getLog(MatchDriver.class);
- protected PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> taskContext;
+ protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator; // the iterator that does the actual matching
@@ -57,7 +57,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1,
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> context) {
+ public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -68,9 +68,9 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1,
}
@Override
- public Class<GenericJoiner<IT1, IT2, OT>> getStubType() {
+ public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericJoiner<IT1, IT2, OT>> clazz = (Class<GenericJoiner<IT1, IT2, OT>>) (Class<?>) GenericJoiner.class;
+ final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = (Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class;
return clazz;
}
@@ -141,7 +141,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1,
@Override
public void run() throws Exception {
- final GenericJoiner<IT1, IT2, OT> matchStub = this.taskContext.getStub();
+ final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
final JoinTaskIterator<IT1, IT2, OT> matchIterator = this.matchIterator;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index ffe27e6..33d8a18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.operators;
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
@@ -28,15 +28,15 @@ import org.apache.flink.util.MutableObjectIterator;
*
* @param <T> The data type.
*/
-public class NoOpDriver<T> implements PactDriver<AbstractFunction, T> {
+public class NoOpDriver<T> implements PactDriver<AbstractRichFunction, T> {
- private PactTaskContext<AbstractFunction, T> taskContext;
+ private PactTaskContext<AbstractRichFunction, T> taskContext;
private volatile boolean running;
@Override
- public void setup(PactTaskContext<AbstractFunction, T> context) {
+ public void setup(PactTaskContext<AbstractRichFunction, T> context) {
this.taskContext = context;
this.running = true;
}
@@ -47,7 +47,7 @@ public class NoOpDriver<T> implements PactDriver<AbstractFunction, T> {
}
@Override
- public Class<AbstractFunction> getStubType() {
+ public Class<AbstractRichFunction> getStubType() {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 4d72085..87cea30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -44,7 +44,7 @@ import org.apache.flink.util.MutableObjectIterator;
*
* @param <T> The data type consumed and produced by the combiner.
*/
-public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> {
+public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T> {
private static final Log LOG = LogFactory.getLog(ReduceCombineDriver.class);
@@ -52,13 +52,13 @@ public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> {
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
- private PactTaskContext<GenericReduce<T>, T> taskContext;
+ private PactTaskContext<ReduceFunction<T>, T> taskContext;
private TypeSerializer<T> serializer;
private TypeComparator<T> comparator;
- private GenericReduce<T> reducer;
+ private ReduceFunction<T> reducer;
private Collector<T> output;
@@ -75,7 +75,7 @@ public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> {
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GenericReduce<T>, T> context) {
+ public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
this.taskContext = context;
this.running = true;
}
@@ -86,9 +86,9 @@ public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> {
}
@Override
- public Class<GenericReduce<T>> getStubType() {
+ public Class<ReduceFunction<T>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericReduce<T>> clazz = (Class<GenericReduce<T>>) (Class<?>) GenericReduce.class;
+ final Class<ReduceFunction<T>> clazz = (Class<ReduceFunction<T>>) (Class<?>) ReduceFunction.class;
return clazz;
}
@@ -168,7 +168,7 @@ public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> {
final TypeSerializer<T> serializer = this.serializer;
final TypeComparator<T> comparator = this.comparator;
- final GenericReduce<T> function = this.reducer;
+ final ReduceFunction<T> function = this.reducer;
final Collector<T> output = this.output;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index a7e9305..9495cdc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -36,13 +36,13 @@ import org.apache.flink.util.MutableObjectIterator;
* The ReduceTask creates a iterator over all records from its input. The iterator returns all records grouped by their
* key. The iterator is handed to the <code>reduce()</code> method of the ReduceFunction.
*
- * @see GenericReduce
+ * @see org.apache.flink.api.common.functions.ReduceFunction
*/
-public class ReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
+public class ReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
private static final Log LOG = LogFactory.getLog(ReduceDriver.class);
- private PactTaskContext<GenericReduce<T>, T> taskContext;
+ private PactTaskContext<ReduceFunction<T>, T> taskContext;
private MutableObjectIterator<T> input;
@@ -55,7 +55,7 @@ public class ReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GenericReduce<T>, T> context) {
+ public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
this.taskContext = context;
this.running = true;
}
@@ -66,9 +66,9 @@ public class ReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
}
@Override
- public Class<GenericReduce<T>> getStubType() {
+ public Class<ReduceFunction<T>> getStubType() {
@SuppressWarnings("unchecked")
- final Class<GenericReduce<T>> clazz = (Class<GenericReduce<T>>) (Class<?>) GenericReduce.class;
+ final Class<ReduceFunction<T>> clazz = (Class<ReduceFunction<T>>) (Class<?>) ReduceFunction.class;
return clazz;
}
@@ -101,7 +101,7 @@ public class ReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
final TypeSerializer<T> serializer = this.serializer;
final TypeComparator<T> comparator = this.comparator;
- final GenericReduce<T> function = this.taskContext.getStub();
+ final ReduceFunction<T> function = this.taskContext.getStub();
final Collector<T> output = this.taskContext.getOutputCollector();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 45fe05a..c8f217c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -24,8 +24,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericCombine;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -497,7 +498,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
if (this.stub != null) {
try {
Configuration stubConfig = this.config.getStubParameters();
- this.stub.open(stubConfig);
+ FunctionUtils.openFunction(this.stub, stubConfig);
stubOpen = true;
}
catch (Throwable t) {
@@ -510,7 +511,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// close. We close here such that a regular close throwing an exception marks a task as failed.
if (this.running && this.stub != null) {
- this.stub.close();
+ FunctionUtils.closeFunction(this.stub);
stubOpen = false;
}
@@ -525,15 +526,17 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// modify accumulators.ll;
if (this.stub != null) {
// collect the counters from the stub
- Map<String, Accumulator<?,?>> accumulators = this.stub.getRuntimeContext().getAllAccumulators();
- RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
+ if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null) {
+ Map<String, Accumulator<?, ?>> accumulators = FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators();
+ RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
+ }
}
}
catch (Exception ex) {
// close the input, but do not report any exceptions, since we already have another root cause
if (stubOpen) {
try {
- this.stub.close();
+ FunctionUtils.closeFunction(this.stub);
}
catch (Throwable t) {}
}
@@ -582,9 +585,12 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// tasks. Type conflicts can occur here if counters with same name but
// different type were used.
+
for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
- Map<String, Accumulator<?, ?>> chainedAccumulators = chainedTask.getStub().getRuntimeContext().getAllAccumulators();
- AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
+ if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
+ Map<String, Accumulator<?, ?>> chainedAccumulators = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators();
+ AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
+ }
}
// Don't report if the UDF didn't collect any accumulators
@@ -607,7 +613,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// done before sending
AccumulatorHelper.resetAndClearAccumulators(accumulators);
for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
- AccumulatorHelper.resetAndClearAccumulators(chainedTask.getStub().getRuntimeContext().getAllAccumulators());
+ if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
+ AccumulatorHelper.resetAndClearAccumulators(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators());
+ }
}
}
@@ -693,7 +701,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" +
stubSuperClass.getName() + "' as is required.");
}
- stub.setRuntimeContext(this.runtimeUdfContext);
+ FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext);
return stub;
}
catch (ClassCastException ccex) {
@@ -988,13 +996,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
e.getMessage() == null ? "." : ": " + e.getMessage(), e);
}
- if (!(localStub instanceof GenericCombine)) {
+ if (!(localStub instanceof FlatCombineFunction)) {
throw new IllegalStateException("Performing combining sort outside a reduce task!");
}
@SuppressWarnings({ "rawtypes", "unchecked" })
CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger(
- (GenericCombine) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
+ (FlatCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
this.config.getSpillingThresholdInput(inputNum));
@@ -1375,7 +1383,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// --------------------------------------------------------------------------------------------
/**
- * Opens the given stub using its {@link Function#open(Configuration)} method. If the open call produces
+ * Opens the given stub using its {@link org.apache.flink.api.common.functions.RichFunction#open(Configuration)} method. If the open call produces
* an exception, a new exception with a standard error message is created, using the encountered exception
* as its cause.
*
@@ -1386,14 +1394,14 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
*/
public static void openUserCode(Function stub, Configuration parameters) throws Exception {
try {
- stub.open(parameters);
+ FunctionUtils.openFunction(stub, parameters);
} catch (Throwable t) {
throw new Exception("The user defined 'open(Configuration)' method in " + stub.getClass().toString() + " caused an exception: " + t.getMessage(), t);
}
}
/**
- * Closes the given stub using its {@link Function#close()} method. If the close call produces
+ * Closes the given stub using its {@link org.apache.flink.api.common.functions.RichFunction#close()} method. If the close call produces
* an exception, a new exception with a standard error message is created, using the encountered exception
* as its cause.
*
@@ -1403,7 +1411,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
*/
public static void closeUserCode(Function stub) throws Exception {
try {
- stub.close();
+ FunctionUtils.closeFunction(stub);
} catch (Throwable t) {
throw new Exception("The user defined 'close()' method caused an exception: " + t.getMessage(), t);
}
@@ -1505,4 +1513,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
}
return a;
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
index 3f7ad61..3dbab78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.operators.chaining;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.GenericCollectorMap;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -61,7 +61,7 @@ public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
// --------------------------------------------------------------------------------------------
- public Function getStub() {
+ public RichFunction getStub() {
return this.mapper;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
index cca6838..db134a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
@@ -19,25 +19,26 @@
package org.apache.flink.runtime.operators.chaining;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.RegularPactTask;
public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
- private GenericFlatMap<IT, OT> mapper;
+ private FlatMapFunction<IT, OT> mapper;
// --------------------------------------------------------------------------------------------
@Override
public void setup(AbstractInvokable parent) {
@SuppressWarnings("unchecked")
- final GenericFlatMap<IT, OT> mapper =
- RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GenericFlatMap.class);
+ final FlatMapFunction<IT, OT> mapper =
+ RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatMapFunction.class);
this.mapper = mapper;
- mapper.setRuntimeContext(getUdfRuntimeContext());
+ FunctionUtils.setFunctionRuntimeContext(mapper, getUdfRuntimeContext());
}
@Override
@@ -54,8 +55,9 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
@Override
public void cancelTask() {
try {
- this.mapper.close();
- } catch (Throwable t) {
+ FunctionUtils.closeFunction(this.mapper);
+ }
+ catch (Throwable t) {
}
}
@@ -84,4 +86,5 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
public void close() {
this.outputCollector.close();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
index 3ae324c..a7b1048 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
@@ -20,24 +20,25 @@
package org.apache.flink.runtime.operators.chaining;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.RegularPactTask;
public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
- private GenericMap<IT, OT> mapper;
+ private MapFunction<IT, OT> mapper;
// --------------------------------------------------------------------------------------------
@Override
public void setup(AbstractInvokable parent) {
@SuppressWarnings("unchecked")
- final GenericMap<IT, OT> mapper =
- RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GenericMap.class);
+ final MapFunction<IT, OT> mapper =
+ RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, MapFunction.class);
this.mapper = mapper;
- mapper.setRuntimeContext(getUdfRuntimeContext());
+ FunctionUtils.setFunctionRuntimeContext(mapper, getUdfRuntimeContext());
}
@Override
@@ -54,7 +55,7 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
@Override
public void cancelTask() {
try {
- this.mapper.close();
+ FunctionUtils.closeFunction(this.mapper);
} catch (Throwable t) {
}
}
@@ -84,4 +85,5 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
public void close() {
this.outputCollector.close();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
index 76d860b..3a42469 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.operators.chaining;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.operators.base.BulkIterationBase;
import org.apache.flink.api.common.operators.base.BulkIterationBase.TerminationCriterionAggregator;
@@ -47,7 +47,7 @@ public class ChainedTerminationCriterionDriver<IT, OT> extends ChainedDriver<IT,
// --------------------------------------------------------------------------------------------
- public Function getStub() {
+ public RichFunction getStub() {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index d5ce0a7..ffac151 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -22,8 +22,9 @@ package org.apache.flink.runtime.operators.chaining;
import java.io.IOException;
import java.util.List;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericCombine;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -51,7 +52,7 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
private InMemorySorter<T> sorter;
- private GenericCombine<T> combiner;
+ private FlatCombineFunction<T> combiner;
private TypeSerializer<T> serializer;
@@ -72,10 +73,10 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
this.parent = parent;
@SuppressWarnings("unchecked")
- final GenericCombine<T> combiner =
- RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GenericCombine.class);
+ final FlatCombineFunction<T> combiner =
+ RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatCombineFunction.class);
this.combiner = combiner;
- combiner.setRuntimeContext(getUdfRuntimeContext());
+ FunctionUtils.setFunctionRuntimeContext(combiner, getUdfRuntimeContext());
}
@Override
@@ -185,7 +186,7 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
this.comparator);
// cache references on the stack
- final GenericCombine<T> stub = this.combiner;
+ final FlatCombineFunction<T> stub = this.combiner;
final Collector<T> output = this.outputCollector;
// run stub implementation
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java
index 65af050..0daf69b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.hash;
import java.io.IOException;
import java.util.List;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -101,7 +101,7 @@ public class BuildFirstHashMatchIterator<V1, V2, O> implements JoinTaskIterator<
}
@Override
- public final boolean callWithNextKey(GenericJoiner<V1, V2, O> matchFunction, Collector<O> collector)
+ public final boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
throws Exception
{
if (this.hashJoin.nextRecord())
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java
index e124201..70e5afb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.hash;
import java.io.IOException;
import java.util.List;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -100,7 +100,7 @@ public class BuildSecondHashMatchIterator<V1, V2, O> implements JoinTaskIterator
}
@Override
- public boolean callWithNextKey(GenericJoiner<V1, V2, O> matchFunction, Collector<O> collector)
+ public boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
throws Exception
{
if (this.hashJoin.nextRecord())
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index 06d0ac7..da8a11b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -29,7 +29,8 @@ import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCombine;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -71,7 +72,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
*/
private static final Log LOG = LogFactory.getLog(CombiningUnilateralSortMerger.class);
- private final GenericCombine<E> combineStub; // the user code stub that does the combining
+ private final FlatCombineFunction<E> combineStub; // the user code stub that does the combining
private Configuration udfConfig;
@@ -101,7 +102,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
* @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
* perform the sort.
*/
- public CombiningUnilateralSortMerger(GenericCombine<E> combineStub, MemoryManager memoryManager, IOManager ioManager,
+ public CombiningUnilateralSortMerger(FlatCombineFunction<E> combineStub, MemoryManager memoryManager, IOManager ioManager,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
double memoryFraction, int maxNumFileHandles, float startSpillingFraction)
@@ -133,7 +134,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
* @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
* perform the sort.
*/
- public CombiningUnilateralSortMerger(GenericCombine<E> combineStub, MemoryManager memoryManager, IOManager ioManager,
+ public CombiningUnilateralSortMerger(FlatCombineFunction<E> combineStub, MemoryManager memoryManager, IOManager ioManager,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
double memoryFraction, int numSortBuffers, int maxNumFileHandles,
@@ -254,12 +255,12 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
// ------------------- Spilling Phase ------------------------
- final GenericCombine<E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
+ final FlatCombineFunction<E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
// now that we are actually spilling, take the combiner, and open it
try {
- Configuration conf = CombiningUnilateralSortMerger.this.udfConfig;
- combineStub.open(conf == null ? new Configuration() : conf);
+ Configuration conf = CombiningUnilateralSortMerger.this.udfConfig;
+ FunctionUtils.openFunction (combineStub, (conf == null ? new Configuration() : conf));
}
catch (Throwable t) {
throw new IOException("The user-defined combiner failed in its 'open()' method.", t);
@@ -380,7 +381,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
// close the user code
try {
- combineStub.close();
+ FunctionUtils.closeFunction(combineStub);
}
catch (Throwable t) {
throw new IOException("The user-defined combiner failed in its 'close()' method.", t);
@@ -466,7 +467,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
this.memManager.getPageSize());
final WriterCollector<E> collector = new WriterCollector<E>(output, this.serializer);
- final GenericCombine<E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
+ final FlatCombineFunction<E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
// combine and write to disk
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
index 308e333..2ed75ae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -151,7 +151,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
* @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey()
*/
@Override
- public boolean callWithNextKey(final GenericJoiner<T1, T2, O> matchFunction, final Collector<O> collector)
+ public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
throws Exception
{
if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
@@ -234,7 +234,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
* @throws Exception Forwards all exceptions thrown by the stub.
*/
private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
- final Iterator<T2> valsN, final GenericJoiner<T1, T2, O> matchFunction, final Collector<O> collector)
+ final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
throws Exception
{
this.copy1 = this.serializer1.copy(val1, this.copy1);
@@ -267,7 +267,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
* @throws Exception Forwards all exceptions thrown by the stub.
*/
private void crossSecond1withNValues(T2 val1, T1 firstValN,
- Iterator<T1> valsN, GenericJoiner<T1, T2, O> matchFunction, Collector<O> collector)
+ Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector)
throws Exception
{
this.copy2 = this.serializer2.copy(val1, this.copy2);
@@ -280,7 +280,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
if (valsN.hasNext()) {
this.copy2 = this.serializer2.copy(val1, this.copy2);
- matchFunction.join(nRec, this.copy2, collector);
+ matchFunction.join(nRec,this.copy2,collector);
} else {
matchFunction.join(nRec, val1, collector);
more = false;
@@ -297,7 +297,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
*/
private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
final T2 firstV2, final Iterator<T2> blockVals,
- final GenericJoiner<T1, T2, O> matchFunction, final Collector<O> collector)
+ final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
throws Exception
{
// ==================================================
@@ -411,7 +411,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
// get instances of key and block value
final T2 nextBlockVal = this.blockIt.next();
this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
- matchFunction.join(this.copy1, nextBlockVal, collector);
+ matchFunction.join(this.copy1, nextBlockVal, collector);
}
// reset block iterator
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java
index 6bcb6ee..3ed647d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.util;
import java.io.IOException;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
import org.apache.flink.util.Collector;
@@ -60,7 +60,7 @@ public interface JoinTaskIterator<V1, V2, O>
* @return True, if a next key exists, false if no more keys exist.
* @throws Exception Exceptions from the user code are forwarded.
*/
- boolean callWithNextKey(GenericJoiner<V1, V2, O> matchFunction, Collector<O> collector) throws Exception;
+ boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector) throws Exception;
/**
* Aborts the matching process. This extra abort method is supplied, because a significant time may pass while
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
index e91e100..3894233 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
@@ -23,13 +23,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
-import org.apache.flink.runtime.operators.BuildFirstCachedMatchDriver;
-import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
@@ -43,7 +40,7 @@ import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
-public class CachedMatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record, Record>>
+public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>>
{
private static final long HASH_MEM = 6*1024*1024;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
index 46717be..5551485 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
@@ -23,12 +23,9 @@ import java.util.Iterator;
import org.junit.Assert;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
-import org.apache.flink.runtime.operators.CoGroupDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.IntValue;
@@ -37,7 +34,7 @@ import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Test;
-public class CoGroupTaskExternalITCase extends DriverTestBase<GenericCoGrouper<Record, Record, Record>>
+public class CoGroupTaskExternalITCase extends DriverTestBase<CoGroupFunction<Record, Record, Record>>
{
private static final long SORT_MEM = 3*1024*1024;
@@ -87,7 +84,7 @@ public class CoGroupTaskExternalITCase extends DriverTestBase<GenericCoGrouper<R
Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
}
- public static final class MockCoGroupStub extends CoGroupFunction {
+ public static final class MockCoGroupStub extends org.apache.flink.api.java.record.functions.CoGroupFunction {
private static final long serialVersionUID = 1L;
private final Record res = new Record();