You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/09 14:39:47 UTC
[16/39] [FLINK-701] Refactor Java API to use SAM interfaces.
Introduce RichFunction stubs for all UDFs.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
index a99ffc8..e57d20c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
@@ -24,12 +24,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
-import org.apache.flink.runtime.operators.CoGroupDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.CoGroupTaskExternalITCase.MockCoGroupStub;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -42,7 +39,7 @@ import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Test;
-public class CoGroupTaskTest extends DriverTestBase<GenericCoGrouper<Record, Record, Record>>
+public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Record, Record>>
{
private static final long SORT_MEM = 3*1024*1024;
@@ -403,7 +400,7 @@ public class CoGroupTaskTest extends DriverTestBase<GenericCoGrouper<Record, Rec
Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
}
- public static class MockFailingCoGroupStub extends CoGroupFunction {
+ public static class MockFailingCoGroupStub extends org.apache.flink.api.java.record.functions.CoGroupFunction {
private static final long serialVersionUID = 1L;
private int cnt = 0;
@@ -443,7 +440,7 @@ public class CoGroupTaskTest extends DriverTestBase<GenericCoGrouper<Record, Rec
}
- public static final class MockDelayingCoGroupStub extends CoGroupFunction {
+ public static final class MockDelayingCoGroupStub extends org.apache.flink.api.java.record.functions.CoGroupFunction {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index 694ae48..885a509 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -24,10 +24,8 @@ import java.util.HashMap;
import org.junit.Assert;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.GroupReduceCombineDriver;
import org.apache.flink.runtime.operators.CombineTaskTest.MockCombiningReduceStub;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -37,7 +35,7 @@ import org.apache.flink.types.Record;
import org.junit.Test;
-public class CombineTaskExternalITCase extends DriverTestBase<GenericGroupReduce<Record, ?>> {
+public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFunction<Record, ?>> {
private static final long COMBINE_MEM = 3 * 1024 * 1024;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index f42c443..99440aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -25,12 +25,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.GroupReduceCombineDriver;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -43,7 +41,7 @@ import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Test;
-public class CombineTaskTest extends DriverTestBase<GenericGroupReduce<Record, ?>>
+public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Record, ?>>
{
private static final long COMBINE_MEM = 3 * 1024 * 1024;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskExternalITCase.java
index 79d47fe..ced86d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskExternalITCase.java
@@ -21,9 +21,7 @@ package org.apache.flink.runtime.operators;
import org.junit.Assert;
-import org.apache.flink.api.common.functions.GenericCrosser;
-import org.apache.flink.runtime.operators.CrossDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.runtime.operators.CrossTaskTest.MockCrossStub;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -31,7 +29,7 @@ import org.apache.flink.types.Record;
import org.junit.Test;
-public class CrossTaskExternalITCase extends DriverTestBase<GenericCrosser<Record, Record, Record>>
+public class CrossTaskExternalITCase extends DriverTestBase<CrossFunction<Record, Record, Record>>
{
private static final long CROSS_MEM = 1024 * 1024;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
index 01015d3..ee51ad0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
@@ -23,20 +23,16 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
-import org.apache.flink.api.common.functions.GenericCrosser;
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.runtime.operators.CrossDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
import org.junit.Test;
-public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record, Record>>
+public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>>
{
private static final long CROSS_MEM = 1024 * 1024;
@@ -583,26 +579,26 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
Assert.assertTrue("Exception was thrown despite proper canceling.", success.get());
}
- public static final class MockCrossStub extends CrossFunction {
+ public static final class MockCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
private static final long serialVersionUID = 1L;
-
+
@Override
- public void cross(Record record1, Record record2, Collector<Record> out) {
- out.collect(record1);
+ public Record cross(Record record1, Record record2) throws Exception {
+ return record1;
}
}
- public static final class MockFailingCrossStub extends CrossFunction {
+ public static final class MockFailingCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
private static final long serialVersionUID = 1L;
private int cnt = 0;
@Override
- public void cross(Record record1, Record record2, Collector<Record> out) {
+ public Record cross(Record record1, Record record2) {
if (++this.cnt >= 10) {
throw new ExpectedTestException();
}
- out.collect(record1);
+ return record1;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
index 2115bc8..25998f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
@@ -21,12 +21,10 @@ package org.apache.flink.runtime.operators;
import org.junit.Assert;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.MatchDriver;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.IntValue;
@@ -35,7 +33,7 @@ import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Test;
-public class MatchTaskExternalITCase extends DriverTestBase<GenericJoiner<Record, Record, Record>>
+public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>>
{
private static final long HASH_MEM = 4*1024*1024;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index 6eea546..9b7c582 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -23,12 +23,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.MatchDriver;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
@@ -42,7 +40,7 @@ import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
-public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record, Record>>
+public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>>
{
private static final long HASH_MEM = 6*1024*1024;
@@ -951,11 +949,10 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
private int cnt = 0;
@Override
- public void join(Record record1, Record record2, Collector<Record> out) {
+ public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
if (++this.cnt >= 10) {
throw new ExpectedTestException();
}
-
out.collect(record1);
}
}
@@ -964,10 +961,11 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
private static final long serialVersionUID = 1L;
@Override
- public void join(Record record1, Record record2, Collector<Record> out) {
+ public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
try {
Thread.sleep(100);
- } catch (InterruptedException e) { }
+ } catch (InterruptedException e) {
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index bf51f41..ebee0b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -27,13 +27,11 @@ import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.GroupReduceDriver;
import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -43,7 +41,7 @@ import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Test;
-public class ReduceTaskExternalITCase extends DriverTestBase<GenericGroupReduce<Record, Record>>
+public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunction<Record, Record>>
{
private static final Log LOG = LogFactory.getLog(ReduceTaskExternalITCase.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index 0d4e370..b367a7e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -28,13 +28,11 @@ import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.GroupReduceDriver;
import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -48,7 +46,7 @@ import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Test;
-public class ReduceTaskTest extends DriverTestBase<GenericGroupReduce<Record, Record>>
+public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Record, Record>>
{
private static final Log LOG = LogFactory.getLog(ReduceTaskTest.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
index 8c8715e..d603dec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
@@ -23,8 +23,8 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.operators.AllGroupReduceDriver;
@@ -46,8 +46,8 @@ public class AllGroupReduceDriverTest {
@Test
public void testAllReduceDriverImmutableEmpty() {
try {
- TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
- new TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
+ TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+ new TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
TypeInformation<Tuple2<String, Integer>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -72,8 +72,8 @@ public class AllGroupReduceDriverTest {
@Test
public void testAllReduceDriverImmutable() {
try {
- TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
- new TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
+ TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+ new TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
TypeInformation<Tuple2<String, Integer>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -112,8 +112,8 @@ public class AllGroupReduceDriverTest {
@Test
public void testAllReduceDriverMutable() {
try {
- TestTaskContext<GenericGroupReduce<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
- new TestTaskContext<GenericGroupReduce<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+ TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+ new TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
TypeInformation<Tuple2<StringValue, IntValue>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -152,7 +152,7 @@ public class AllGroupReduceDriverTest {
// Test UDFs
// --------------------------------------------------------------------------------------------
- public static final class ConcatSumReducer extends GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+ public static final class ConcatSumReducer extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
@Override
public void reduce(Iterator<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
@@ -169,7 +169,7 @@ public class AllGroupReduceDriverTest {
}
}
- public static final class ConcatSumMutableReducer extends GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
+ public static final class ConcatSumMutableReducer extends RichGroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
@Override
public void reduce(Iterator<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java
index de01d74..b124e51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java
@@ -22,8 +22,8 @@ package org.apache.flink.runtime.operators.drivers;
import java.util.Arrays;
import java.util.List;
-import org.apache.flink.api.common.functions.GenericReduce;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.operators.AllReduceDriver;
@@ -44,8 +44,8 @@ public class AllReduceDriverTest {
@Test
public void testAllReduceDriverImmutableEmpty() {
try {
- TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
- new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
+ TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
TypeInformation<Tuple2<String, Integer>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -71,8 +71,8 @@ public class AllReduceDriverTest {
public void testAllReduceDriverImmutable() {
try {
{
- TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
- new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
+ TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
TypeInformation<Tuple2<String, Integer>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -103,8 +103,8 @@ public class AllReduceDriverTest {
}
{
- TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
- new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
+ TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
TypeInformation<Tuple2<String, Integer>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -145,8 +145,8 @@ public class AllReduceDriverTest {
public void testAllReduceDriverMutable() {
try {
{
- TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
- new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+ TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
TypeInformation<Tuple2<StringValue, IntValue>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -176,8 +176,8 @@ public class AllReduceDriverTest {
Assert.assertEquals(78, res.f1.getValue());
}
{
- TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
- new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+ TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
TypeInformation<Tuple2<StringValue, IntValue>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -217,7 +217,7 @@ public class AllReduceDriverTest {
// Test UDFs
// --------------------------------------------------------------------------------------------
- public static final class ConcatSumFirstReducer extends ReduceFunction<Tuple2<String, Integer>> {
+ public static final class ConcatSumFirstReducer extends RichReduceFunction<Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
@@ -227,7 +227,7 @@ public class AllReduceDriverTest {
}
}
- public static final class ConcatSumSecondReducer extends ReduceFunction<Tuple2<String, Integer>> {
+ public static final class ConcatSumSecondReducer extends RichReduceFunction<Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
@@ -237,7 +237,7 @@ public class AllReduceDriverTest {
}
}
- public static final class ConcatSumFirstMutableReducer extends ReduceFunction<Tuple2<StringValue, IntValue>> {
+ public static final class ConcatSumFirstMutableReducer extends RichReduceFunction<Tuple2<StringValue, IntValue>> {
@Override
public Tuple2<StringValue, IntValue> reduce(Tuple2<StringValue, IntValue> value1, Tuple2<StringValue, IntValue> value2) {
@@ -247,7 +247,7 @@ public class AllReduceDriverTest {
}
}
- public static final class ConcatSumSecondMutableReducer extends ReduceFunction<Tuple2<StringValue, IntValue>> {
+ public static final class ConcatSumSecondMutableReducer extends RichReduceFunction<Tuple2<StringValue, IntValue>> {
@Override
public Tuple2<StringValue, IntValue> reduce(Tuple2<StringValue, IntValue> value1, Tuple2<StringValue, IntValue> value2) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
index e5a01d6..a325e33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
@@ -22,9 +22,9 @@ package org.apache.flink.runtime.operators.drivers;
import java.util.Iterator;
import java.util.List;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -46,8 +46,8 @@ public class GroupReduceDriverTest {
@Test
public void testAllReduceDriverImmutableEmpty() {
try {
- TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
- new TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
+ TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+ new TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0));
@@ -74,8 +74,8 @@ public class GroupReduceDriverTest {
@Test
public void testAllReduceDriverImmutable() {
try {
- TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
- new TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
+ TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+ new TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0));
@@ -110,8 +110,8 @@ public class GroupReduceDriverTest {
@Test
public void testAllReduceDriverMutable() {
try {
- TestTaskContext<GenericGroupReduce<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
- new TestTaskContext<GenericGroupReduce<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+ TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+ new TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));
@@ -149,7 +149,7 @@ public class GroupReduceDriverTest {
// Test UDFs
// --------------------------------------------------------------------------------------------
- public static final class ConcatSumReducer extends GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+ public static final class ConcatSumReducer extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
@Override
public void reduce(Iterator<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
@@ -166,7 +166,7 @@ public class GroupReduceDriverTest {
}
}
- public static final class ConcatSumMutableReducer extends GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
+ public static final class ConcatSumMutableReducer extends RichGroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
@Override
public void reduce(Iterator<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java
index 734e119..ae9c294 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java
@@ -22,9 +22,9 @@ package org.apache.flink.runtime.operators.drivers;
import java.util.Collections;
import java.util.List;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -44,8 +44,8 @@ public class ReduceCombineDriverTest {
@Test
public void testImmutableEmpty() {
try {
- TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
- new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
+ TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
context.getTaskConfig().setRelativeMemoryDriver(0.5);
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
@@ -81,8 +81,8 @@ public class ReduceCombineDriverTest {
public void testReduceDriverImmutable() {
try {
{
- TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
- new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
+ TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
context.getTaskConfig().setRelativeMemoryDriver(0.5);
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
@@ -112,8 +112,8 @@ public class ReduceCombineDriverTest {
}
{
- TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
- new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
+ TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
context.getTaskConfig().setRelativeMemoryDriver(0.5);
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
@@ -153,8 +153,8 @@ public class ReduceCombineDriverTest {
public void testReduceDriverMutable() {
try {
{
- TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
- new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>(1024 * 1024);
+ TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>(1024 * 1024);
context.getTaskConfig().setRelativeMemoryDriver(0.5);
List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
@@ -181,8 +181,8 @@ public class ReduceCombineDriverTest {
DriverTestData.compareTupleArrays(expected, res);
}
{
- TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
- new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>(1024 * 1024);
+ TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>(1024 * 1024);
context.getTaskConfig().setRelativeMemoryDriver(0.5);
List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
@@ -220,7 +220,7 @@ public class ReduceCombineDriverTest {
// Test UDFs
// --------------------------------------------------------------------------------------------
- public static final class ConcatSumFirstReducer extends ReduceFunction<Tuple2<String, Integer>> {
+ public static final class ConcatSumFirstReducer extends RichReduceFunction<Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
@@ -230,7 +230,7 @@ public class ReduceCombineDriverTest {
}
}
- public static final class ConcatSumSecondReducer extends ReduceFunction<Tuple2<String, Integer>> {
+ public static final class ConcatSumSecondReducer extends RichReduceFunction<Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
@@ -240,7 +240,7 @@ public class ReduceCombineDriverTest {
}
}
- public static final class ConcatSumFirstMutableReducer extends ReduceFunction<Tuple2<StringValue, IntValue>> {
+ public static final class ConcatSumFirstMutableReducer extends RichReduceFunction<Tuple2<StringValue, IntValue>> {
@Override
public Tuple2<StringValue, IntValue> reduce(Tuple2<StringValue, IntValue> value1, Tuple2<StringValue, IntValue> value2) {
@@ -250,7 +250,7 @@ public class ReduceCombineDriverTest {
}
}
- public static final class ConcatSumSecondMutableReducer extends ReduceFunction<Tuple2<StringValue, IntValue>> {
+ public static final class ConcatSumSecondMutableReducer extends RichReduceFunction<Tuple2<StringValue, IntValue>> {
@Override
public Tuple2<StringValue, IntValue> reduce(Tuple2<StringValue, IntValue> value1, Tuple2<StringValue, IntValue> value2) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java
index e8370cf..28217b4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.operators.drivers;
import java.util.List;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -43,8 +43,8 @@ public class ReduceDriverTest {
@Test
public void testReduceDriverImmutableEmpty() {
try {
- TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
- new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
+ TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0));
@@ -76,8 +76,8 @@ public class ReduceDriverTest {
public void testReduceDriverImmutable() {
try {
{
- TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
- new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
+ TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0));
@@ -104,8 +104,8 @@ public class ReduceDriverTest {
}
{
- TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
- new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
+ TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0));
@@ -142,8 +142,8 @@ public class ReduceDriverTest {
public void testReduceDriverMutable() {
try {
{
- TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
- new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+ TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));
@@ -169,8 +169,8 @@ public class ReduceDriverTest {
DriverTestData.compareTupleArrays(expected, res);
}
{
- TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
- new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+ TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+ new TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));
@@ -206,7 +206,7 @@ public class ReduceDriverTest {
// Test UDFs
// --------------------------------------------------------------------------------------------
- public static final class ConcatSumFirstReducer extends ReduceFunction<Tuple2<String, Integer>> {
+ public static final class ConcatSumFirstReducer extends RichReduceFunction<Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
@@ -216,7 +216,7 @@ public class ReduceDriverTest {
}
}
- public static final class ConcatSumSecondReducer extends ReduceFunction<Tuple2<String, Integer>> {
+ public static final class ConcatSumSecondReducer extends RichReduceFunction<Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
@@ -226,7 +226,7 @@ public class ReduceDriverTest {
}
}
- public static final class ConcatSumFirstMutableReducer extends ReduceFunction<Tuple2<StringValue, IntValue>> {
+ public static final class ConcatSumFirstMutableReducer extends RichReduceFunction<Tuple2<StringValue, IntValue>> {
@Override
public Tuple2<StringValue, IntValue> reduce(Tuple2<StringValue, IntValue> value1, Tuple2<StringValue, IntValue> value2) {
@@ -236,7 +236,7 @@ public class ReduceDriverTest {
}
}
- public static final class ConcatSumSecondMutableReducer extends ReduceFunction<Tuple2<StringValue, IntValue>> {
+ public static final class ConcatSumSecondMutableReducer extends RichReduceFunction<Tuple2<StringValue, IntValue>> {
@Override
public Tuple2<StringValue, IntValue> reduce(Tuple2<StringValue, IntValue> value1, Tuple2<StringValue, IntValue> value2) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
index 303d921..ac8cc78 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
@@ -26,8 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -39,8 +39,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
@@ -414,7 +412,7 @@ public class HashMatchIteratorITCase {
collectIntPairData(input1),
collectRecordData(input2));
- final GenericJoiner<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
+ final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
final Collector<Record> collector = new DiscardingOutputCollector<Record>();
// reset the generators
@@ -425,7 +423,7 @@ public class HashMatchIteratorITCase {
// compare with iterator values
BuildSecondHashMatchIterator<IntPair, Record, Record> iterator =
new BuildSecondHashMatchIterator<IntPair, Record, Record>(
- input1, input2, this.pairSerializer, this.pairComparator,
+ input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
this.memoryManager, this.ioManager, this.parentTask, 1.0);
@@ -461,7 +459,7 @@ public class HashMatchIteratorITCase {
collectIntPairData(input1),
collectRecordData(input2));
- final GenericJoiner<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
+ final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
final Collector<Record> collector = new DiscardingOutputCollector<Record>();
// reset the generators
@@ -675,7 +673,7 @@ public class HashMatchIteratorITCase {
}
@Override
- public void join(Record rec1, Record rec2, Collector<Record> out)
+ public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception
{
TestData.Key key = rec1.getField(0, TestData.Key.class);
TestData.Value value1 = rec1.getField(1, TestData.Value.class);
@@ -695,7 +693,7 @@ public class HashMatchIteratorITCase {
}
}
- static final class RecordIntPairMatchRemovingMatcher extends AbstractFunction implements GenericJoiner<IntPair, Record, Record>
+ static final class RecordIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Record, Record>
{
private final Map<TestData.Key, Collection<RecordIntPairMatch>> toRemoveFrom;
@@ -704,7 +702,7 @@ public class HashMatchIteratorITCase {
}
@Override
- public void join(IntPair rec1, Record rec2, Collector<Record> out)
+ public void join(IntPair rec1, Record rec2, Collector<Record> out) throws Exception
{
final int k = rec1.getKey();
final int v = rec1.getValue();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
index e2de4cb..93e52f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.sort.MergeIterator;
import org.apache.flink.runtime.operators.sort.MergeMatchIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
@@ -353,8 +354,7 @@ public class SortMergeMatchIteratorITCase {
}
@Override
- public void join(Record rec1, Record rec2, Collector<Record> out)
- {
+ public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception {
TestData.Key key = rec1.getField(0, TestData.Key.class);
TestData.Value value1 = rec1.getField(1, TestData.Value.class);
TestData.Value value2 = rec2.getField(1, TestData.Value.class);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 9c177a6..e6661ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -25,6 +25,8 @@ import java.util.List;
import org.junit.Assert;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
@@ -173,7 +175,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
// open stub implementation
try {
- this.stub.open(getTaskConfig().getStubParameters());
+ FunctionUtils.openFunction(this.stub, getTaskConfig().getStubParameters());
stubOpen = true;
}
catch (Throwable t) {
@@ -185,7 +187,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
// close. We close here such that a regular close throwing an exception marks a task as failed.
if (this.running) {
- this.stub.close();
+ FunctionUtils.closeFunction (this.stub);
stubOpen = false;
}
@@ -195,7 +197,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
// close the input, but do not report any exceptions, since we already have another root cause
if (stubOpen) {
try {
- this.stub.close();
+ FunctionUtils.closeFunction(this.stub);
}
catch (Throwable t) {}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index faa87b5..779640d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators.testutils;
import java.util.List;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.java.record.io.DelimitedInputFormat;
@@ -80,10 +80,10 @@ public abstract class TaskTestBase {
return this.mockEnv.getTaskConfiguration();
}
- public void registerTask(AbstractInvokable task, @SuppressWarnings("rawtypes") Class<? extends PactDriver> driver, Class<? extends Function> stubClass) {
+ public void registerTask(AbstractInvokable task, @SuppressWarnings("rawtypes") Class<? extends PactDriver> driver, Class<? extends RichFunction> stubClass) {
final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration());
config.setDriver(driver);
- config.setStubWrapper(new UserCodeClassWrapper<Function>(stubClass));
+ config.setStubWrapper(new UserCodeClassWrapper<RichFunction>(stubClass));
task.setEnvironment(this.mockEnv);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 92b6c44..8b891a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator;
import org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator;
import org.apache.flink.runtime.operators.sort.MergeMatchIterator;
@@ -250,6 +251,7 @@ public class HashVsSortMiniBenchmark {
private static final long serialVersionUID = 1L;
@Override
- public void join(Record rec1, Record rec2, Collector<Record> out) {}
+ public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/CrossFunction.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/CrossFunction.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/CrossFunction.scala
index 6dfd1e3..c292100 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/CrossFunction.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/CrossFunction.scala
@@ -45,7 +45,7 @@ abstract class CrossFunctionBase[LeftIn: UDT, RightIn: UDT, Out: UDT] extends JC
}
abstract class CrossFunction[LeftIn: UDT, RightIn: UDT, Out: UDT] extends CrossFunctionBase[LeftIn, RightIn, Out] with Function2[LeftIn, RightIn, Out] {
- override def cross(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
+ override def cross(leftRecord: Record, rightRecord: Record) : Record = {
val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
val output = apply(left, right)
@@ -59,31 +59,7 @@ abstract class CrossFunction[LeftIn: UDT, RightIn: UDT, Out: UDT] extends CrossF
leftRecord.copyFrom(leftRecord, leftForwardFrom, leftForwardTo)
serializer.serialize(output, leftRecord)
- out.collect(leftRecord)
- }
-}
-
-abstract class FlatCrossFunction[LeftIn: UDT, RightIn: UDT, Out: UDT] extends CrossFunctionBase[LeftIn, RightIn, Out] with Function2[LeftIn, RightIn, Iterator[Out]] {
- override def cross(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
- val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
- val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
- val output = apply(left, right)
-
- if (output.nonEmpty) {
-
- leftRecord.setNumFields(outputLength)
-
- for (field <- leftDiscard)
- leftRecord.setNull(field)
-
- leftRecord.copyFrom(rightRecord, rightForwardFrom, rightForwardTo)
- leftRecord.copyFrom(leftRecord, leftForwardFrom, leftForwardTo)
-
- for (item <- output) {
- serializer.serialize(item, leftRecord)
- out.collect(leftRecord)
- }
- }
+ leftRecord
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CrossOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CrossOperator.scala
index 17df2c3..aa04093 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CrossOperator.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CrossOperator.scala
@@ -27,7 +27,7 @@ import java.util.{ Iterator => JIterator }
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.analysis._
-import org.apache.flink.api.scala.functions.{CrossFunctionBase, CrossFunction, FlatCrossFunction}
+import org.apache.flink.api.scala.functions.{CrossFunctionBase, CrossFunction}//, FlatCrossFunction}
import org.apache.flink.api.scala.codegen.{MacroContextHolder, Util}
import org.apache.flink.api.scala.functions.DeserializingIterator
import org.apache.flink.api.scala.DataSet
@@ -42,7 +42,7 @@ import org.apache.flink.configuration.Configuration
class CrossDataSet[LeftIn, RightIn](val leftInput: DataSet[LeftIn], val rightInput: DataSet[RightIn]) {
def map[Out](fun: (LeftIn, RightIn) => Out): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = macro CrossMacros.map[LeftIn, RightIn, Out]
- def flatMap[Out](fun: (LeftIn, RightIn) => Iterator[Out]): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = macro CrossMacros.flatMap[LeftIn, RightIn, Out]
+ //def flatMap[Out](fun: (LeftIn, RightIn) => Iterator[Out]): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = macro CrossMacros.flatMap[LeftIn, RightIn, Out]
def filter(fun: (LeftIn, RightIn) => Boolean): DataSet[(LeftIn, RightIn)] with TwoInputHintable[LeftIn, RightIn, (LeftIn, RightIn)] = macro CrossMacros.filter[LeftIn, RightIn]
}
@@ -65,7 +65,7 @@ object CrossMacros {
implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
new CrossFunctionBase[LeftIn, RightIn, Out] {
- override def cross(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
+ override def cross(leftRecord: Record, rightRecord: Record) : Record = {
val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
val output = fun.splice.apply(left, right)
@@ -79,7 +79,7 @@ object CrossMacros {
leftRecord.copyFrom(leftRecord, leftForwardFrom, leftForwardTo)
serializer.serialize(output, leftRecord)
- out.collect(leftRecord)
+ leftRecord
}
}
}
@@ -106,67 +106,6 @@ object CrossMacros {
return result
}
- def flatMap[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = CrossDataSet[LeftIn, RightIn] })(fun: c.Expr[(LeftIn, RightIn) => Iterator[Out]]): c.Expr[DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out]] = {
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val (udtLeftIn, createUdtLeftIn) = slave.mkUdtClass[LeftIn]
- val (udtRightIn, createUdtRightIn) = slave.mkUdtClass[RightIn]
- val (udtOut, createUdtOut) = slave.mkUdtClass[Out]
-
- val stub: c.Expr[CrossFunctionBase[LeftIn, RightIn, Out]] = if (fun.actualType <:< weakTypeOf[CrossFunction[LeftIn, RightIn, Out]])
- reify { fun.splice.asInstanceOf[CrossFunctionBase[LeftIn, RightIn, Out]] }
- else reify {
- implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
- implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
- implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
- new CrossFunctionBase[LeftIn, RightIn, Out] {
- override def cross(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
- val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
- val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
- val output = fun.splice.apply(left, right)
-
- if (output.nonEmpty) {
-
- leftRecord.setNumFields(outputLength)
-
- for (field <- leftDiscard)
- leftRecord.setNull(field)
-
- leftRecord.copyFrom(rightRecord, rightForwardFrom, rightForwardTo)
- leftRecord.copyFrom(leftRecord, leftForwardFrom, leftForwardTo)
-
- for (item <- output) {
- serializer.serialize(item, leftRecord)
- out.collect(leftRecord)
- }
- }
- }
- }
- }
- val contract = reify {
- val helper: CrossDataSet[LeftIn, RightIn] = c.prefix.splice
- val leftInput = helper.leftInput.contract
- val rightInput = helper.rightInput.contract
- val generatedStub = ClosureCleaner.clean(stub.splice)
- val builder = CrossOperator.builder(generatedStub).input1(leftInput).input2(rightInput)
-
- val ret = new CrossOperator(builder) with TwoInputScalaOperator[LeftIn, RightIn, Out] {
- override def getUDF = generatedStub.udf
- override def annotations = Seq(
- Annotations.getConstantFieldsFirst(
- Util.filterNonForwards(getUDF.getLeftForwardIndexArrayFrom, getUDF.getLeftForwardIndexArrayTo)),
- Annotations.getConstantFieldsSecond(
- Util.filterNonForwards(getUDF.getRightForwardIndexArrayFrom, getUDF.getRightForwardIndexArrayTo)))
- }
- new DataSet[Out](ret) with TwoInputHintable[LeftIn, RightIn, Out] {}
- }
-
- val result = c.Expr[DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out]](Block(List(udtLeftIn, udtRightIn, udtOut), contract.tree))
-
- return result
- }
def filter[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag](c: Context { type PrefixType = CrossDataSet[LeftIn, RightIn] })(fun: c.Expr[(LeftIn, RightIn) => Boolean]): c.Expr[DataSet[(LeftIn, RightIn)] with TwoInputHintable[LeftIn, RightIn, (LeftIn, RightIn)]] = {
import c.universe._
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/IterateOperators.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/IterateOperators.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/IterateOperators.scala
index 759b444..66d94ba 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/IterateOperators.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/IterateOperators.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.scala.analysis.UDF0
import org.apache.flink.api.scala.analysis.FieldSelector
import org.apache.flink.configuration.Configuration
-import org.apache.flink.api.common.functions.AbstractFunction
+import org.apache.flink.api.common.functions.AbstractRichFunction
import org.apache.flink.api.java.record.operators.BulkIteration
import org.apache.flink.api.common.operators.base.BulkIterationBase
import org.apache.flink.api.java.record.operators.DeltaIteration
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
index f450483..ccdd52e 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
@@ -166,7 +166,7 @@ public abstract class CompilerTestBase {
}
@SuppressWarnings("unchecked")
- public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) {
+ public <T extends PlanNode> T getNode(String name, Class<? extends RichFunction> stubClass) {
List<PlanNode> nodes = this.map.get(name);
if (nodes == null || nodes.isEmpty()) {
throw new RuntimeException("No node found with the given name and stub class.");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
index de27390..28bdd01 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
@@ -27,7 +27,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.operators.BulkIteration;
import org.apache.flink.api.java.record.operators.DeltaIteration;
@@ -63,7 +63,7 @@ public class OperatorResolver implements Visitor<Operator<?>> {
}
@SuppressWarnings("unchecked")
- public <T extends Operator<?>> T getNode(String name, Class<? extends Function> stubClass) {
+ public <T extends Operator<?>> T getNode(String name, Class<? extends RichFunction> stubClass) {
List<Operator<?>> nodes = this.map.get(name);
if (nodes == null || nodes.isEmpty()) {
throw new RuntimeException("No node found with the given name and stub class.");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index e6ce7c1..950e78e 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -35,6 +35,7 @@ under the License.
<packaging>jar</packaging>
+
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -184,7 +185,6 @@ under the License.
</execution>
</executions>
</plugin>
-
</plugins>
<pluginManagement>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
index 15cfac3..e0a96b3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
@@ -23,9 +23,9 @@ import static org.junit.Assert.fail;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.operators.DriverStrategy;
@@ -106,7 +106,7 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
return result;
}
- public static final class SummingJoin extends JoinFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+ public static final class SummingJoin extends RichJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
@Override
public Tuple2<Long, Double> join(Tuple2<Long, Double> first, Tuple2<Long, Double> second) {
@@ -114,7 +114,7 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
}
}
- public static final class SummingJoinProject extends JoinFunction<Tuple3<Long, Double, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+ public static final class SummingJoinProject extends RichJoinFunction<Tuple3<Long, Double, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
@Override
public Tuple2<Long, Double> join(Tuple3<Long, Double, Double> first, Tuple2<Long, Double> second) {
@@ -122,7 +122,7 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
}
}
- public static final class Duplicator extends FlatMapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
+ public static final class Duplicator extends RichFlatMapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
@Override
public void flatMap(Tuple2<Long, Double> value, Collector<Tuple2<Long, Double>> out) {
@@ -131,7 +131,7 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
}
}
- public static final class Expander extends MapFunction<Tuple2<Long, Double>, Tuple3<Long, Double, Double>> {
+ public static final class Expander extends RichMapFunction<Tuple2<Long, Double>, Tuple3<Long, Double, Double>> {
@Override
public Tuple3<Long, Double, Double> map(Tuple2<Long, Double> value) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
index 857f321..35aea4b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
@@ -24,7 +24,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
@@ -61,7 +61,7 @@ public class BulkIterationWithAllReducerITCase extends JavaProgramTestBase {
}
- public static class PickOneAllReduce extends GroupReduceFunction<Integer, Integer> {
+ public static class PickOneAllReduce extends RichGroupReduceFunction<Integer, Integer> {
private Integer bcValue;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
index 31540dc..49bdb26 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
@@ -23,9 +23,9 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -90,7 +90,7 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
// The test program
// --------------------------------------------------------------------------------------------
- public static final class VertexParser extends MapFunction<String, Long> {
+ public static final class VertexParser extends RichMapFunction<String, Long> {
@Override
public Long map(String value) throws Exception {
@@ -98,7 +98,7 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
}
}
- public static final class EdgeParser extends FlatMapFunction<String, Tuple2<Long, Long>> {
+ public static final class EdgeParser extends RichFlatMapFunction<String, Tuple2<Long, Long>> {
@Override
public void flatMap(String value, Collector<Tuple2<Long, Long>> out) throws Exception {
@@ -113,7 +113,7 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
@ConstantFieldsFirst("0")
@ConstantFieldsSecond("0")
- public static final class MinIdAndUpdate extends CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ public static final class MinIdAndUpdate extends RichCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public void coGroup(Iterator<Tuple2<Long, Long>> candidates, Iterator<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
index be24662..d2669aa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
@@ -23,9 +23,9 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.JavaProgramTestBase;
@@ -153,8 +153,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
}
}
- public static final class FindCandidatesJoin extends JoinFunction
- <Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
+ public static final class FindCandidatesJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
private static final long serialVersionUID = 1L;
@@ -166,7 +165,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
}
}
- public static final class RemoveDuplicatesReduce extends GroupReduceFunction<Long, Long> {
+ public static final class RemoveDuplicatesReduce extends RichGroupReduceFunction<Long, Long> {
private static final long serialVersionUID = 1L;
@@ -176,8 +175,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
}
}
- public static final class FindCandidatesDependenciesJoin extends JoinFunction
- <Long, Tuple2<Long, Long>,Tuple2<Long, Long>> {
+ public static final class FindCandidatesDependenciesJoin extends RichJoinFunction<Long, Tuple2<Long, Long>,Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
@@ -187,8 +185,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
}
}
- public static final class NeighborWithComponentIDJoin extends JoinFunction
- <Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
@@ -201,8 +198,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
}
}
- public static final class MinimumReduce extends GroupReduceFunction
- <Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ public static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
@@ -228,8 +224,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
}
}
- public static final class MinimumIdFilter extends FlatMapFunction
- <Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+ public static final class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index d719cc5..9ea33d7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -28,8 +28,8 @@ import org.junit.Assert;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -287,7 +287,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
}
@SuppressWarnings("serial")
- public static final class SubtractOneMap extends MapFunction<Integer, Integer> {
+ public static final class SubtractOneMap extends RichMapFunction<Integer, Integer> {
private LongSumAggregator aggr;
@@ -309,7 +309,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
}
@SuppressWarnings("serial")
- public static final class SubtractOneMapWithParam extends MapFunction<Integer, Integer> {
+ public static final class SubtractOneMapWithParam extends RichMapFunction<Integer, Integer> {
private LongSumAggregatorWithParameter aggr;
@@ -345,7 +345,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
}
@SuppressWarnings("serial")
- public static final class TupleMakerMap extends MapFunction<Integer, Tuple2<Integer, Integer>> {
+ public static final class TupleMakerMap extends RichMapFunction<Integer, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Integer, Integer> map(Integer value) throws Exception {
@@ -357,7 +357,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
}
@SuppressWarnings("serial")
- public static final class AggregateMapDelta extends MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+ public static final class AggregateMapDelta extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private LongSumAggregator aggr;
private LongValue previousAggr;
@@ -388,8 +388,8 @@ public class AggregatorsITCase extends JavaProgramTestBase {
}
@SuppressWarnings("serial")
- public static final class UpdateFilter extends FlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>,
- Tuple2<Integer, Integer>> {
+ public static final class UpdateFilter extends RichFlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>,
+ Tuple2<Integer, Integer>> {
private int superstep;
@@ -411,7 +411,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
}
@SuppressWarnings("serial")
- public static final class ProjectSecondMapper extends MapFunction<Tuple2<Integer, Integer>, Integer> {
+ public static final class ProjectSecondMapper extends RichMapFunction<Tuple2<Integer, Integer>, Integer> {
@Override
public Integer map(Tuple2<Integer, Integer> value) {
@@ -420,7 +420,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
}
@SuppressWarnings("serial")
- public static final class AggregateMapDeltaWithParam extends MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+ public static final class AggregateMapDeltaWithParam extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private LongSumAggregatorWithParameter aggr;
private LongValue previousAggr;