You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/05/05 23:03:42 UTC
[3/4] flink git commit: [FLINK-1682] Ported optimizer unit tests from
Record API to Java API
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
index f42eb02..41e0eb9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -52,7 +53,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(0,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -73,7 +74,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -96,7 +97,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(0,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -119,7 +120,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -142,7 +143,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -164,7 +165,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -187,7 +188,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -212,7 +213,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(0,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -238,7 +239,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -263,7 +264,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -288,7 +289,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,2).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -313,7 +314,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,2).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -338,7 +339,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,2).equalTo(1,2).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -363,7 +364,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,2).equalTo(1,2).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -384,7 +385,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.coGroup(set2)
.where(0,1).equalTo(0,1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -405,7 +406,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.coGroup(set2)
.where(0,1).equalTo(2,1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -428,7 +429,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.coGroup(set2)
.where(0,1).equalTo(0,1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -451,7 +452,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.coGroup(set2)
.where(0,1).equalTo(2,1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -473,7 +474,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("2;1"))
.where(0,1).equalTo(2, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -495,7 +496,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.coGroup(set2)
.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -517,7 +518,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("2"))
.where(0,1).equalTo(2,1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -541,7 +542,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("0;1"))
.where(0, 1).equalTo(0, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -566,7 +567,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("1;2"))
.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -590,7 +591,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("2;1"))
.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -614,7 +615,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("1"))
.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -638,7 +639,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("1"))
.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -662,7 +663,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("2"))
.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -686,7 +687,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("1"))
.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
index 84f6377..68e8a41 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.*;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -50,7 +52,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
.map(new IdentityMapper<Long>())
.withBroadcastSet(source, "bc");
- result.print();
+ result.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -84,7 +86,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
.withBroadcastSet(bcInput1, "bc1")
.withBroadcastSet(bcInput2, "bc2");
- result.print();
+ result.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -123,7 +125,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
.withBroadcastSet(bcInput1, "bc1");
- iteration.closeWith(result).print();
+ iteration.closeWith(result).output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -154,7 +156,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
@@ -176,7 +178,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
@@ -199,7 +201,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
@@ -222,7 +224,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
index d6b9444..dc9f2e5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -64,7 +65,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy(0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long,String>>());
JavaPlan plan = env.createProgramPlan();
@@ -97,7 +98,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy(1, 0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long,String>>());
JavaPlan plan = env.createProgramPlan();
@@ -129,7 +130,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy("*");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long,SomePojo,String>>());
JavaPlan plan = env.createProgramPlan();
@@ -161,7 +162,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy("f1");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long,SomePojo,String>>());
JavaPlan plan = env.createProgramPlan();
@@ -193,7 +194,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy("f1.stringField");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -225,7 +226,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy("f1.intField; f2");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -258,7 +259,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy("byDate", 1, 0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -293,7 +294,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(0)
.splitsGroupedBy(0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -327,7 +328,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(0)
.splitsGroupedBy(1, 0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -362,7 +363,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(1)
.splitsGroupedBy(0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -396,7 +397,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(0, 1)
.splitsGroupedBy(0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -429,7 +430,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f2")
.splitsGroupedBy("f2");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -463,7 +464,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f1.intField")
.splitsGroupedBy("f0; f1.intField");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -497,7 +498,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f1.intField")
.splitsGroupedBy("f1");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -530,7 +531,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f1")
.splitsGroupedBy("f1.stringField");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -565,7 +566,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(1)
.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -599,7 +600,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(1)
.splitsOrderedBy(new int[]{1, 0}, new Order[]{Order.ASCENDING, Order.DESCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -634,7 +635,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(0)
.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -668,7 +669,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(0, 1)
.splitsOrderedBy(new int[]{1}, new Order[]{Order.DESCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -701,7 +702,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f1.intField")
.splitsOrderedBy("f0; f1.intField", new Order[]{Order.ASCENDING, Order.DESCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -735,7 +736,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f1.intField")
.splitsOrderedBy("f1", new Order[]{Order.DESCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -768,7 +769,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f1")
.splitsOrderedBy("f1.stringField", new Order[]{Order.ASCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -808,7 +809,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data2.getSplitDataProperties()
.splitsPartitionedBy("byDate", 0);
- data1.union(data2).print();
+ data1.union(data2).output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -856,7 +857,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data2.getSplitDataProperties()
.splitsPartitionedBy("byDate", 0);
- data1.union(data2).print();
+ data1.union(data2).output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
index 3af64fc..b0dca66 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
@@ -20,36 +20,35 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.fail;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
import org.junit.Test;
/**
* This test case has been created to validate a bug that occurred when
* the ReduceOperator was used without a grouping key.
*/
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
public class ReduceAllTest extends CompilerTestBase {
@Test
public void testReduce() {
// construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce()).name("Reduce1").input(source).build();
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setInput(reduce1);
- Plan plan = new Plan(sink, "AllReduce Test");
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> set1 = env.generateSequence(0,1);
+
+ set1.reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
+ .output(new DiscardingOutputFormat<Long>()).name("Sink");
+
+ JavaPlan plan = env.createProgramPlan();
+
try {
OptimizedPlan oPlan = compileNoStats(plan);
JobGraphGenerator jobGen = new JobGraphGenerator();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
index 25643a4..26af380 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
@@ -44,7 +44,7 @@ import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial", "unchecked"})
public class ReplicatingDataSourceTest extends CompilerTestBase {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index 3a24ce1..00ada2a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -57,7 +58,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
.groupBy(1)
.reduce(new MockReducer()).withForwardedFields("*");
- set.print();
+ set.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -118,7 +119,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
.reduce(new MockReducer()).withForwardedFields("f1->f2");
DataSet<Tuple3<Integer, Integer, Integer>> out = in1.join(in2).where(1).equalTo(2).with(new MockJoin());
- out.print();
+ out.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
index 65e5025..a94f845 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.*;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
@@ -30,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -51,12 +52,12 @@ public class SortPartialReuseTest extends CompilerTestBase {
.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(0, 1)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(0)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
-
- .print();
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
+
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -96,12 +97,12 @@ public class SortPartialReuseTest extends CompilerTestBase {
.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(0, 1)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(1)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
-
- .print();
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
+
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
index 1001626..f041b2a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
@@ -50,8 +51,8 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
DataSet<Long> result = iteration.closeWith(
input2.union(input2).union(iteration.union(iteration)));
- result.print();
- result.print();
+ result.output(new DiscardingOutputFormat<Long>());
+ result.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -102,8 +103,8 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
DataSet<Long> iterResult = iteration
.closeWith(iteration.union(iteration).union(input2.union(input2)));
- iterResult.print();
- iterResult.print();
+ iterResult.output(new DiscardingOutputFormat<Long>());
+ iterResult.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
index 2e52565..fee6e17 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
@@ -26,13 +26,12 @@ import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.IntValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Visitor;
import org.junit.Assert;
@@ -45,37 +44,26 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
public class UnionPropertyPropagationTest extends CompilerTestBase {
- @SuppressWarnings("unchecked")
@Test
- public void testUnionPropertyOldApiPropagation() {
+ public void testUnion1() {
// construct the plan
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> sourceA = env.generateSequence(0,1);
+ DataSet<Long> sourceB = env.generateSequence(0,1);
- FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
- FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
-
- ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(sourceA)
- .build();
- ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(sourceB)
- .build();
-
- ReduceOperator globalRed = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).build();
- globalRed.addInput(redA);
- globalRed.addInput(redB);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, globalRed);
-
- // return the plan
- Plan plan = new Plan(sink, "Union Property Propagation");
+ DataSet<Long> redA = sourceA.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>());
+ DataSet<Long> redB = sourceB.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>());
+
+ redA.union(redB).groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+ .output(new DiscardingOutputFormat<Long>());
+
+ JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
@@ -88,7 +76,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
@Override
public boolean preVisit(PlanNode visitable) {
- if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof ReduceOperator) {
+ if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof GroupReduceOperatorBase) {
for (Channel inConn : visitable.getInputs()) {
Assert.assertTrue("Reduce should just forward the input if it is already partitioned",
inConn.getShipStrategy() == ShipStrategyType.FORWARD);
@@ -107,7 +95,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
}
@Test
- public void testUnionNewApiAssembly() {
+ public void testUnion2() {
final int NUM_INPUTS = 4;
// construct the plan it will be multiple flat maps, all unioned
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index e81e0ec..65dd2b3 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -40,8 +41,8 @@ public class UnionReplacementTest extends CompilerTestBase {
DataSet<String> union = input1.union(input2);
- union.print();
- union.print();
+ union.output(new DiscardingOutputFormat<String>());
+ union.output(new DiscardingOutputFormat<String>());
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
index 321ca5a..32bd6e9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
@@ -24,11 +24,13 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
@@ -40,12 +42,13 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 100).map(new Duplicator<Long>());
+ DataSet<Tuple2<Long, Long>> input = env.readCsvFile("/tmp/some.csv").types(Long.class, Long.class);
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 100, 1);
- DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new TestMapper<Tuple2<Long,Long>>());
- iteration.closeWith(iterEnd, iterEnd).print();
+ DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+ iteration.closeWith(iterEnd, iterEnd)
+ .output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -61,18 +64,5 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
fail(e.getMessage());
}
}
-
- private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
- @Override
- public Tuple2<T, T> map(T value) {
- return new Tuple2<T, T>(value, value);
- }
- }
-
- private static final class TestMapper<T> implements MapFunction<T, T> {
- @Override
- public T map(T value) {
- return value;
- }
- }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
index 27f367f..46b9357 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
@@ -25,25 +25,21 @@ import static org.junit.Assert.fail;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.JoinOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.LongValue;
import org.junit.Test;
@@ -51,7 +47,6 @@ import org.junit.Test;
* Tests that validate optimizer choices when using operators that are requesting certain specific execution
* strategies.
*/
-@SuppressWarnings("deprecation")
public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
private static final long serialVersionUID = 1L;
@@ -66,7 +61,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
@Test
public void testRecordApiWithDeferredSoltionSetUpdateWithMapper() {
- Plan plan = getRecordTestPlan(false, true);
+ Plan plan = getTestPlan(false, true);
OptimizedPlan oPlan;
try {
@@ -112,7 +107,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
@Test
public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
- Plan plan = getRecordTestPlan(false, false);
+ Plan plan = getTestPlan(false, false);
OptimizedPlan oPlan;
try {
@@ -156,7 +151,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
@Test
public void testRecordApiWithDirectSoltionSetUpdate() {
- Plan plan = getRecordTestPlan(true, false);
+ Plan plan = getTestPlan(true, false);
OptimizedPlan oPlan;
try {
@@ -197,52 +192,45 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
new JobGraphGenerator().compileJobGraph(oPlan);
}
- private Plan getRecordTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
- FileDataSource solutionSetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Solution Set");
- FileDataSource worksetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Workset");
-
- FileDataSource invariantInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Invariant Input");
-
- DeltaIteration iteration = new DeltaIteration(0, ITERATION_NAME);
- iteration.setInitialSolutionSet(solutionSetInput);
- iteration.setInitialWorkset(worksetInput);
- iteration.setMaximumNumberOfIterations(100);
-
- JoinOperator joinWithInvariant = JoinOperator.builder(new DummyMatchStub(), LongValue.class, 0, 0)
- .input1(iteration.getWorkset())
- .input2(invariantInput)
- .name(JOIN_WITH_INVARIANT_NAME)
- .build();
-
- JoinOperator joinWithSolutionSet = JoinOperator.builder(
- joinPreservesSolutionSet ? new DummyMatchStub() : new DummyNonPreservingMatchStub(), LongValue.class, 0, 0)
- .input1(iteration.getSolutionSet())
- .input2(joinWithInvariant)
- .name(JOIN_WITH_SOLUTION_SET)
- .build();
-
- ReduceOperator nextWorkset = ReduceOperator.builder(new IdentityReduce(), LongValue.class, 0)
- .input(joinWithSolutionSet)
- .name(NEXT_WORKSET_REDUCER_NAME)
- .build();
-
- if (mapBeforeSolutionDelta) {
- MapOperator mapper = MapOperator.builder(new IdentityMap())
- .input(joinWithSolutionSet)
- .name(SOLUTION_DELTA_MAPPER_NAME)
- .build();
- iteration.setSolutionSetDelta(mapper);
- } else {
- iteration.setSolutionSetDelta(joinWithSolutionSet);
+ private Plan getTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
+
+ // construct the plan
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Tuple2<Long, Long>> solSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Solution Set");
+ DataSet<Tuple2<Long, Long>> workSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Workset");
+ DataSet<Tuple2<Long, Long>> invariantInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Invariant Input");
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> deltaIt = solSetInput.iterateDelta(workSetInput, 100, 0).name(ITERATION_NAME);
+
+ DataSet<Tuple2<Long, Long>> join1 = deltaIt.getWorkset().join(invariantInput).where(0).equalTo(0)
+ .with(new IdentityJoiner<Tuple2<Long, Long>>())
+ .withForwardedFieldsFirst("*").name(JOIN_WITH_INVARIANT_NAME);
+
+ DataSet<Tuple2<Long, Long>> join2 = deltaIt.getSolutionSet().join(join1).where(0).equalTo(0)
+ .with(new IdentityJoiner<Tuple2<Long, Long>>())
+ .name(JOIN_WITH_SOLUTION_SET);
+ if(joinPreservesSolutionSet) {
+ ((JoinOperator<?,?,?>)join2).withForwardedFieldsFirst("*");
}
-
- iteration.setNextWorkset(nextWorkset);
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, iteration, "Sink");
-
- Plan plan = new Plan(sink);
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
- return plan;
+ DataSet<Tuple2<Long, Long>> nextWorkset = join2.groupBy(0).reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>())
+ .withForwardedFields("*").name(NEXT_WORKSET_REDUCER_NAME);
+
+ if(mapBeforeSolutionDelta) {
+
+ DataSet<Tuple2<Long, Long>> mapper = join2.map(new IdentityMapper<Tuple2<Long, Long>>())
+ .withForwardedFields("*").name(SOLUTION_DELTA_MAPPER_NAME);
+
+ deltaIt.closeWith(mapper, nextWorkset)
+ .output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
+ }
+ else {
+ deltaIt.closeWith(join2, nextWorkset)
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+ }
+
+ return env.createProgramPlan();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
index d52181d..346e702 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -246,7 +246,7 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
.distinct(0, 1)
.groupBy(1)
.sortGroup(0, Order.ASCENDING)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
grouped
.coGroup(partitioned).where(0).equalTo(0)
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
index 5758c86..17a7659 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;
@@ -57,7 +57,7 @@ public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase {
.withPartitioner(partitioner);
joined.groupBy(1).withPartitioner(partitioner)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
.print();
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
index 0408ca9..23f4812 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;
@@ -84,7 +84,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
.withPartitioner(new TestPartitionerInt())
- .reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
.print();
Plan p = env.createProgramPlan();
@@ -115,7 +115,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>())
.withPartitioner(new TestPartitionerInt())
.sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
.print();
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
index 74e5c8c..54033ac 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;
@@ -76,7 +76,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
.rebalance().setParallelism(4);
data.groupBy("a").withPartitioner(new TestPartitionerInt())
- .reduceGroup(new IdentityGroupReducer<Pojo2>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
.print();
Plan p = env.createProgramPlan();
@@ -106,7 +106,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.sortGroup("b", Order.ASCENDING)
- .reduceGroup(new IdentityGroupReducer<Pojo3>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
.print();
Plan p = env.createProgramPlan();
@@ -137,7 +137,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.sortGroup("b", Order.ASCENDING)
.sortGroup("c", Order.DESCENDING)
- .reduceGroup(new IdentityGroupReducer<Pojo4>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
.print();
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
index 72fb81b..49f44f5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;
@@ -108,7 +108,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
- .reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
.print();
Plan p = env.createProgramPlan();
@@ -138,7 +138,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sortGroup(1, Order.ASCENDING)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
.print();
Plan p = env.createProgramPlan();
@@ -169,7 +169,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sortGroup(1, Order.ASCENDING)
.sortGroup(2, Order.DESCENDING)
- .reduceGroup(new IdentityGroupReducer<Tuple4<Integer,Integer,Integer,Integer>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>())
.print();
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
index 8eedee1..ff429b8 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -243,7 +243,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
.distinct(0, 1)
.groupBy(1)
.sortGroup(0, Order.ASCENDING)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
grouped
.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
index 95ee4de..9c2d0d2 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;
@@ -49,7 +49,7 @@ public class PartitionOperatorTest extends CompilerTestBase {
public int partition(Long key, int numPartitions) { return key.intValue(); }
}, 1)
.groupBy(1)
- .reduceGroup(new IdentityGroupReducer<Tuple2<Long,Long>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
.print();
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
new file mode 100644
index 0000000..9d8ac2e
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.util.Collector;
+
+public class IdentityCoGrouper<T> implements CoGroupFunction<T, T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(Iterable<T> first, Iterable<T> second, Collector<T> out) {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
new file mode 100644
index 0000000..54b2785
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+
+public class IdentityCrosser<T> implements CrossFunction<T, T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T cross(T first, T second) {
+ return first;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
index 11fd044..da4ef17 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.util.Collector;
-
-@Combinable
public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
new file mode 100644
index 0000000..ce24bb6
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.util.Collector;
+
+@Combinable
+public class IdentityGroupReducerCombinable<T> extends RichGroupReduceFunction<T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reduce(Iterable<T> values, Collector<T> out) {
+ for (T next : values) {
+ out.collect(next);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
new file mode 100644
index 0000000..faca2ce
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+
+public class IdentityJoiner<T> implements JoinFunction<T, T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T join(T first, T second) {
+ return first;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
deleted file mode 100644
index 6a84c44..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyCoGroupStub extends CoGroupFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
- while (records1.hasNext()) {
- out.collect(records1.next());
- }
-
- while (records2.hasNext()) {
- out.collect(records2.next());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
deleted file mode 100644
index 8ee2285..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.types.Record;
-
-@SuppressWarnings("deprecation")
-public class DummyCrossStub extends CrossFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Record cross(Record first, Record second) throws Exception {
- return first;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
deleted file mode 100644
index 0c816e7..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-public final class DummyInputFormat extends DelimitedInputFormat {
- private static final long serialVersionUID = 1L;
-
- private final IntValue integer = new IntValue(1);
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
- target.setField(0, this.integer);
- target.setField(1, this.integer);
- return target;
- }
-
- @Override
- public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics) {
- return (cachedStatistics instanceof FileBaseStatistics) ? (FileBaseStatistics) cachedStatistics : null;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
deleted file mode 100644
index d00be6e..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsFirstExcept({})
-public class DummyMatchStub extends JoinFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
- out.collect(value1);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
deleted file mode 100644
index 444b48e..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyNonPreservingMatchStub extends JoinFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
- out.collect(value1);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
deleted file mode 100644
index 1bbe24c..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.util;
-
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.Record;
-
-
-public final class DummyOutputFormat extends DelimitedOutputFormat {
- private static final long serialVersionUID = 1L;
-
- @Override
- public int serializeRecord(Record rec, byte[] target) throws Exception {
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
deleted file mode 100644
index cccc6cb..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityMap extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- out.collect(record);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
deleted file mode 100644
index f45745d..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityReduce extends ReduceFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
- while (records.hasNext()) {
- out.collect(records.next());
- }
- }
-}