You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/05/05 23:03:40 UTC
[1/4] flink git commit: [FLINK-1951] Fix NullPointerException in
delta iteration due to missing temp
Repository: flink
Updated Branches:
refs/heads/master 60ec68308 -> bd96ba8d1
[FLINK-1951] Fix NullPointerException in delta iteration due to missing temp
This closes #641
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/adb321d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/adb321d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/adb321d6
Branch: refs/heads/master
Commit: adb321d61cc783b3a2a78f4e707104d75e1d63c0
Parents: 60ec683
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Apr 30 17:34:02 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue May 5 22:56:36 2015 +0200
----------------------------------------------------------------------
.../plantranslate/JobGraphGenerator.java | 5 +-
.../plantranslate/TempInIterationsTest.java | 81 ++++++++++++++++++++
2 files changed, 84 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/adb321d6/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index dc21c13..2630019 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -1163,8 +1163,9 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
final TempMode tm = channel.getTempMode();
boolean needsMemory = false;
- // Don't add a pipeline breaker if the data exchange is already blocking.
- if (tm.breaksPipeline() && channel.getDataExchangeMode() != DataExchangeMode.BATCH) {
+ // Don't add a pipeline breaker if the data exchange is already blocking, EXCEPT the channel is within an iteration.
+ if (tm.breaksPipeline() &&
+ (channel.isOnDynamicPath() || channel.getDataExchangeMode() != DataExchangeMode.BATCH) ) {
config.setInputAsynchronouslyMaterialized(inputNum, true);
needsMemory = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/adb321d6/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
new file mode 100644
index 0000000..15cb03f
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plantranslate;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TempInIterationsTest {
+
+ /*
+ * Tests whether temps barriers are correctly set in within iterations
+ */
+ @Test
+ public void testTempInIterationTest() throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> input = env.readCsvFile("file:///does/not/exist").types(Long.class, Long.class);
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
+ input.iterateDelta(input, 1, 0);
+
+ DataSet<Tuple2<Long, Long>> update = iteration.getWorkset()
+ .join(iteration.getSolutionSet()).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ iteration.closeWith(update, update)
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = (new Optimizer(new Configuration())).compile(plan);
+
+ JobGraphGenerator jgg = new JobGraphGenerator();
+ JobGraph jg = jgg.compileJobGraph(oPlan);
+
+ boolean solutionSetUpdateChecked = false;
+ for(AbstractJobVertex v : jg.getVertices()) {
+ if(v.getName().equals("SolutionSet Delta")) {
+
+ // check if input of solution set delta is temped
+ TaskConfig tc = new TaskConfig(v.getConfiguration());
+ assertTrue(tc.isInputAsynchronouslyMaterialized(0));
+ solutionSetUpdateChecked = true;
+ }
+ }
+ assertTrue(solutionSetUpdateChecked);
+
+ }
+
+}
[3/4] flink git commit: [FLINK-1682] Ported optimizer unit tests from
Record API to Java API
Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
index f42eb02..41e0eb9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -52,7 +53,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(0,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -73,7 +74,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -96,7 +97,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(0,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -119,7 +120,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -142,7 +143,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -164,7 +165,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -187,7 +188,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -212,7 +213,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(0,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -238,7 +239,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -263,7 +264,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,1).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -288,7 +289,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,2).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -313,7 +314,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,2).equalTo(2,1).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -338,7 +339,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,2).equalTo(1,2).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -363,7 +364,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
.where(0,2).equalTo(1,2).with(new MockJoin());
- joined.print();
+ joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -384,7 +385,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.coGroup(set2)
.where(0,1).equalTo(0,1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -405,7 +406,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.coGroup(set2)
.where(0,1).equalTo(2,1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -428,7 +429,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.coGroup(set2)
.where(0,1).equalTo(0,1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -451,7 +452,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.coGroup(set2)
.where(0,1).equalTo(2,1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -473,7 +474,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("2;1"))
.where(0,1).equalTo(2, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -495,7 +496,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.coGroup(set2)
.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -517,7 +518,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("2"))
.where(0,1).equalTo(2,1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -541,7 +542,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("0;1"))
.where(0, 1).equalTo(0, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -566,7 +567,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("1;2"))
.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -590,7 +591,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("2;1"))
.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -614,7 +615,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("1"))
.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -638,7 +639,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("1"))
.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -662,7 +663,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("2"))
.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -686,7 +687,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
.withForwardedFields("1"))
.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
- coGrouped.print();
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
index 84f6377..68e8a41 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.*;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -50,7 +52,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
.map(new IdentityMapper<Long>())
.withBroadcastSet(source, "bc");
- result.print();
+ result.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -84,7 +86,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
.withBroadcastSet(bcInput1, "bc1")
.withBroadcastSet(bcInput2, "bc2");
- result.print();
+ result.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -123,7 +125,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
.withBroadcastSet(bcInput1, "bc1");
- iteration.closeWith(result).print();
+ iteration.closeWith(result).output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -154,7 +156,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
@@ -176,7 +178,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
@@ -199,7 +201,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
@@ -222,7 +224,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
initialSource
.map(new IdentityMapper<Long>())
.cross(initialSource).withParameters(conf)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
index d6b9444..dc9f2e5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -64,7 +65,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy(0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long,String>>());
JavaPlan plan = env.createProgramPlan();
@@ -97,7 +98,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy(1, 0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long,String>>());
JavaPlan plan = env.createProgramPlan();
@@ -129,7 +130,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy("*");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long,SomePojo,String>>());
JavaPlan plan = env.createProgramPlan();
@@ -161,7 +162,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy("f1");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long,SomePojo,String>>());
JavaPlan plan = env.createProgramPlan();
@@ -193,7 +194,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy("f1.stringField");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -225,7 +226,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy("f1.intField; f2");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -258,7 +259,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data.getSplitDataProperties()
.splitsPartitionedBy("byDate", 1, 0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -293,7 +294,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(0)
.splitsGroupedBy(0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -327,7 +328,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(0)
.splitsGroupedBy(1, 0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -362,7 +363,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(1)
.splitsGroupedBy(0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -396,7 +397,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(0, 1)
.splitsGroupedBy(0);
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -429,7 +430,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f2")
.splitsGroupedBy("f2");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -463,7 +464,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f1.intField")
.splitsGroupedBy("f0; f1.intField");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -497,7 +498,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f1.intField")
.splitsGroupedBy("f1");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -530,7 +531,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f1")
.splitsGroupedBy("f1.stringField");
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -565,7 +566,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(1)
.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -599,7 +600,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(1)
.splitsOrderedBy(new int[]{1, 0}, new Order[]{Order.ASCENDING, Order.DESCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -634,7 +635,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(0)
.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -668,7 +669,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy(0, 1)
.splitsOrderedBy(new int[]{1}, new Order[]{Order.DESCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -701,7 +702,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f1.intField")
.splitsOrderedBy("f0; f1.intField", new Order[]{Order.ASCENDING, Order.DESCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -735,7 +736,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f1.intField")
.splitsOrderedBy("f1", new Order[]{Order.DESCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -768,7 +769,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
.splitsPartitionedBy("f1")
.splitsOrderedBy("f1.stringField", new Order[]{Order.ASCENDING});
- data.print();
+ data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -808,7 +809,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data2.getSplitDataProperties()
.splitsPartitionedBy("byDate", 0);
- data1.union(data2).print();
+ data1.union(data2).output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
@@ -856,7 +857,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
data2.getSplitDataProperties()
.splitsPartitionedBy("byDate", 0);
- data1.union(data2).print();
+ data1.union(data2).output(new DiscardingOutputFormat<Tuple2<Long, String>>());
JavaPlan plan = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
index 3af64fc..b0dca66 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
@@ -20,36 +20,35 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.fail;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
import org.junit.Test;
/**
* This test case has been created to validate a bug that occurred when
* the ReduceOperator was used without a grouping key.
*/
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
public class ReduceAllTest extends CompilerTestBase {
@Test
public void testReduce() {
// construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce()).name("Reduce1").input(source).build();
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setInput(reduce1);
- Plan plan = new Plan(sink, "AllReduce Test");
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> set1 = env.generateSequence(0,1);
+
+ set1.reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
+ .output(new DiscardingOutputFormat<Long>()).name("Sink");
+
+ JavaPlan plan = env.createProgramPlan();
+
try {
OptimizedPlan oPlan = compileNoStats(plan);
JobGraphGenerator jobGen = new JobGraphGenerator();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
index 25643a4..26af380 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
@@ -44,7 +44,7 @@ import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial", "unchecked"})
public class ReplicatingDataSourceTest extends CompilerTestBase {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index 3a24ce1..00ada2a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -57,7 +58,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
.groupBy(1)
.reduce(new MockReducer()).withForwardedFields("*");
- set.print();
+ set.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
@@ -118,7 +119,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
.reduce(new MockReducer()).withForwardedFields("f1->f2");
DataSet<Tuple3<Integer, Integer, Integer>> out = in1.join(in2).where(1).equalTo(2).with(new MockJoin());
- out.print();
+ out.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileWithStats(plan);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
index 65e5025..a94f845 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.*;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
@@ -30,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -51,12 +52,12 @@ public class SortPartialReuseTest extends CompilerTestBase {
.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(0, 1)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(0)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
-
- .print();
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
+
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -96,12 +97,12 @@ public class SortPartialReuseTest extends CompilerTestBase {
.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(0, 1)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
.groupBy(1)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
-
- .print();
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
+
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
index 1001626..f041b2a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
@@ -50,8 +51,8 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
DataSet<Long> result = iteration.closeWith(
input2.union(input2).union(iteration.union(iteration)));
- result.print();
- result.print();
+ result.output(new DiscardingOutputFormat<Long>());
+ result.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -102,8 +103,8 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
DataSet<Long> iterResult = iteration
.closeWith(iteration.union(iteration).union(input2.union(input2)));
- iterResult.print();
- iterResult.print();
+ iterResult.output(new DiscardingOutputFormat<Long>());
+ iterResult.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
index 2e52565..fee6e17 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
@@ -26,13 +26,12 @@ import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.IntValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Visitor;
import org.junit.Assert;
@@ -45,37 +44,26 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
public class UnionPropertyPropagationTest extends CompilerTestBase {
- @SuppressWarnings("unchecked")
@Test
- public void testUnionPropertyOldApiPropagation() {
+ public void testUnion1() {
// construct the plan
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> sourceA = env.generateSequence(0,1);
+ DataSet<Long> sourceB = env.generateSequence(0,1);
- FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
- FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
-
- ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(sourceA)
- .build();
- ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(sourceB)
- .build();
-
- ReduceOperator globalRed = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).build();
- globalRed.addInput(redA);
- globalRed.addInput(redB);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, globalRed);
-
- // return the plan
- Plan plan = new Plan(sink, "Union Property Propagation");
+ DataSet<Long> redA = sourceA.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>());
+ DataSet<Long> redB = sourceB.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>());
+
+ redA.union(redB).groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+ .output(new DiscardingOutputFormat<Long>());
+
+ JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
@@ -88,7 +76,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
@Override
public boolean preVisit(PlanNode visitable) {
- if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof ReduceOperator) {
+ if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof GroupReduceOperatorBase) {
for (Channel inConn : visitable.getInputs()) {
Assert.assertTrue("Reduce should just forward the input if it is already partitioned",
inConn.getShipStrategy() == ShipStrategyType.FORWARD);
@@ -107,7 +95,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
}
@Test
- public void testUnionNewApiAssembly() {
+ public void testUnion2() {
final int NUM_INPUTS = 4;
// construct the plan it will be multiple flat maps, all unioned
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index e81e0ec..65dd2b3 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -40,8 +41,8 @@ public class UnionReplacementTest extends CompilerTestBase {
DataSet<String> union = input1.union(input2);
- union.print();
- union.print();
+ union.output(new DiscardingOutputFormat<String>());
+ union.output(new DiscardingOutputFormat<String>());
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
index 321ca5a..32bd6e9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
@@ -24,11 +24,13 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
@@ -40,12 +42,13 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 100).map(new Duplicator<Long>());
+ DataSet<Tuple2<Long, Long>> input = env.readCsvFile("/tmp/some.csv").types(Long.class, Long.class);
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 100, 1);
- DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new TestMapper<Tuple2<Long,Long>>());
- iteration.closeWith(iterEnd, iterEnd).print();
+ DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+ iteration.closeWith(iterEnd, iterEnd)
+ .output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -61,18 +64,5 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
fail(e.getMessage());
}
}
-
- private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
- @Override
- public Tuple2<T, T> map(T value) {
- return new Tuple2<T, T>(value, value);
- }
- }
-
- private static final class TestMapper<T> implements MapFunction<T, T> {
- @Override
- public T map(T value) {
- return value;
- }
- }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
index 27f367f..46b9357 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
@@ -25,25 +25,21 @@ import static org.junit.Assert.fail;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.JoinOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.LongValue;
import org.junit.Test;
@@ -51,7 +47,6 @@ import org.junit.Test;
* Tests that validate optimizer choices when using operators that are requesting certain specific execution
* strategies.
*/
-@SuppressWarnings("deprecation")
public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
private static final long serialVersionUID = 1L;
@@ -66,7 +61,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
@Test
public void testRecordApiWithDeferredSoltionSetUpdateWithMapper() {
- Plan plan = getRecordTestPlan(false, true);
+ Plan plan = getTestPlan(false, true);
OptimizedPlan oPlan;
try {
@@ -112,7 +107,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
@Test
public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
- Plan plan = getRecordTestPlan(false, false);
+ Plan plan = getTestPlan(false, false);
OptimizedPlan oPlan;
try {
@@ -156,7 +151,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
@Test
public void testRecordApiWithDirectSoltionSetUpdate() {
- Plan plan = getRecordTestPlan(true, false);
+ Plan plan = getTestPlan(true, false);
OptimizedPlan oPlan;
try {
@@ -197,52 +192,45 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
new JobGraphGenerator().compileJobGraph(oPlan);
}
- private Plan getRecordTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
- FileDataSource solutionSetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Solution Set");
- FileDataSource worksetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Workset");
-
- FileDataSource invariantInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Invariant Input");
-
- DeltaIteration iteration = new DeltaIteration(0, ITERATION_NAME);
- iteration.setInitialSolutionSet(solutionSetInput);
- iteration.setInitialWorkset(worksetInput);
- iteration.setMaximumNumberOfIterations(100);
-
- JoinOperator joinWithInvariant = JoinOperator.builder(new DummyMatchStub(), LongValue.class, 0, 0)
- .input1(iteration.getWorkset())
- .input2(invariantInput)
- .name(JOIN_WITH_INVARIANT_NAME)
- .build();
-
- JoinOperator joinWithSolutionSet = JoinOperator.builder(
- joinPreservesSolutionSet ? new DummyMatchStub() : new DummyNonPreservingMatchStub(), LongValue.class, 0, 0)
- .input1(iteration.getSolutionSet())
- .input2(joinWithInvariant)
- .name(JOIN_WITH_SOLUTION_SET)
- .build();
-
- ReduceOperator nextWorkset = ReduceOperator.builder(new IdentityReduce(), LongValue.class, 0)
- .input(joinWithSolutionSet)
- .name(NEXT_WORKSET_REDUCER_NAME)
- .build();
-
- if (mapBeforeSolutionDelta) {
- MapOperator mapper = MapOperator.builder(new IdentityMap())
- .input(joinWithSolutionSet)
- .name(SOLUTION_DELTA_MAPPER_NAME)
- .build();
- iteration.setSolutionSetDelta(mapper);
- } else {
- iteration.setSolutionSetDelta(joinWithSolutionSet);
+ private Plan getTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
+
+ // construct the plan
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Tuple2<Long, Long>> solSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Solution Set");
+ DataSet<Tuple2<Long, Long>> workSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Workset");
+ DataSet<Tuple2<Long, Long>> invariantInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Invariant Input");
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> deltaIt = solSetInput.iterateDelta(workSetInput, 100, 0).name(ITERATION_NAME);
+
+ DataSet<Tuple2<Long, Long>> join1 = deltaIt.getWorkset().join(invariantInput).where(0).equalTo(0)
+ .with(new IdentityJoiner<Tuple2<Long, Long>>())
+ .withForwardedFieldsFirst("*").name(JOIN_WITH_INVARIANT_NAME);
+
+ DataSet<Tuple2<Long, Long>> join2 = deltaIt.getSolutionSet().join(join1).where(0).equalTo(0)
+ .with(new IdentityJoiner<Tuple2<Long, Long>>())
+ .name(JOIN_WITH_SOLUTION_SET);
+ if(joinPreservesSolutionSet) {
+ ((JoinOperator<?,?,?>)join2).withForwardedFieldsFirst("*");
}
-
- iteration.setNextWorkset(nextWorkset);
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, iteration, "Sink");
-
- Plan plan = new Plan(sink);
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
- return plan;
+ DataSet<Tuple2<Long, Long>> nextWorkset = join2.groupBy(0).reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>())
+ .withForwardedFields("*").name(NEXT_WORKSET_REDUCER_NAME);
+
+ if(mapBeforeSolutionDelta) {
+
+ DataSet<Tuple2<Long, Long>> mapper = join2.map(new IdentityMapper<Tuple2<Long, Long>>())
+ .withForwardedFields("*").name(SOLUTION_DELTA_MAPPER_NAME);
+
+ deltaIt.closeWith(mapper, nextWorkset)
+ .output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
+ }
+ else {
+ deltaIt.closeWith(join2, nextWorkset)
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+ }
+
+ return env.createProgramPlan();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
index d52181d..346e702 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -246,7 +246,7 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
.distinct(0, 1)
.groupBy(1)
.sortGroup(0, Order.ASCENDING)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
grouped
.coGroup(partitioned).where(0).equalTo(0)
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
index 5758c86..17a7659 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;
@@ -57,7 +57,7 @@ public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase {
.withPartitioner(partitioner);
joined.groupBy(1).withPartitioner(partitioner)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
.print();
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
index 0408ca9..23f4812 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;
@@ -84,7 +84,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
.withPartitioner(new TestPartitionerInt())
- .reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
.print();
Plan p = env.createProgramPlan();
@@ -115,7 +115,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>())
.withPartitioner(new TestPartitionerInt())
.sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
.print();
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
index 74e5c8c..54033ac 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;
@@ -76,7 +76,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
.rebalance().setParallelism(4);
data.groupBy("a").withPartitioner(new TestPartitionerInt())
- .reduceGroup(new IdentityGroupReducer<Pojo2>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
.print();
Plan p = env.createProgramPlan();
@@ -106,7 +106,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.sortGroup("b", Order.ASCENDING)
- .reduceGroup(new IdentityGroupReducer<Pojo3>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
.print();
Plan p = env.createProgramPlan();
@@ -137,7 +137,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.sortGroup("b", Order.ASCENDING)
.sortGroup("c", Order.DESCENDING)
- .reduceGroup(new IdentityGroupReducer<Pojo4>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
.print();
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
index 72fb81b..49f44f5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;
@@ -108,7 +108,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
- .reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
.print();
Plan p = env.createProgramPlan();
@@ -138,7 +138,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sortGroup(1, Order.ASCENDING)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
.print();
Plan p = env.createProgramPlan();
@@ -169,7 +169,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sortGroup(1, Order.ASCENDING)
.sortGroup(2, Order.DESCENDING)
- .reduceGroup(new IdentityGroupReducer<Tuple4<Integer,Integer,Integer,Integer>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>())
.print();
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
index 8eedee1..ff429b8 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -243,7 +243,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
.distinct(0, 1)
.groupBy(1)
.sortGroup(0, Order.ASCENDING)
- .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
grouped
.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
index 95ee4de..9c2d0d2 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;
@@ -49,7 +49,7 @@ public class PartitionOperatorTest extends CompilerTestBase {
public int partition(Long key, int numPartitions) { return key.intValue(); }
}, 1)
.groupBy(1)
- .reduceGroup(new IdentityGroupReducer<Tuple2<Long,Long>>())
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
.print();
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
new file mode 100644
index 0000000..9d8ac2e
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.util.Collector;
+
+public class IdentityCoGrouper<T> implements CoGroupFunction<T, T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(Iterable<T> first, Iterable<T> second, Collector<T> out) {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
new file mode 100644
index 0000000..54b2785
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+
+public class IdentityCrosser<T> implements CrossFunction<T, T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T cross(T first, T second) {
+ return first;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
index 11fd044..da4ef17 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.util.Collector;
-
-@Combinable
public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
new file mode 100644
index 0000000..ce24bb6
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.util.Collector;
+
+@Combinable
+public class IdentityGroupReducerCombinable<T> extends RichGroupReduceFunction<T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reduce(Iterable<T> values, Collector<T> out) {
+ for (T next : values) {
+ out.collect(next);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
new file mode 100644
index 0000000..faca2ce
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+
+public class IdentityJoiner<T> implements JoinFunction<T, T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T join(T first, T second) {
+ return first;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
deleted file mode 100644
index 6a84c44..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyCoGroupStub extends CoGroupFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
- while (records1.hasNext()) {
- out.collect(records1.next());
- }
-
- while (records2.hasNext()) {
- out.collect(records2.next());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
deleted file mode 100644
index 8ee2285..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.types.Record;
-
-@SuppressWarnings("deprecation")
-public class DummyCrossStub extends CrossFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Record cross(Record first, Record second) throws Exception {
- return first;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
deleted file mode 100644
index 0c816e7..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-public final class DummyInputFormat extends DelimitedInputFormat {
- private static final long serialVersionUID = 1L;
-
- private final IntValue integer = new IntValue(1);
-
- @Override
- public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
- target.setField(0, this.integer);
- target.setField(1, this.integer);
- return target;
- }
-
- @Override
- public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics) {
- return (cachedStatistics instanceof FileBaseStatistics) ? (FileBaseStatistics) cachedStatistics : null;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
deleted file mode 100644
index d00be6e..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsFirstExcept({})
-public class DummyMatchStub extends JoinFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
- out.collect(value1);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
deleted file mode 100644
index 444b48e..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyNonPreservingMatchStub extends JoinFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
- out.collect(value1);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
deleted file mode 100644
index 1bbe24c..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.util;
-
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.Record;
-
-
-public final class DummyOutputFormat extends DelimitedOutputFormat {
- private static final long serialVersionUID = 1L;
-
- @Override
- public int serializeRecord(Record rec, byte[] target) throws Exception {
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
deleted file mode 100644
index cccc6cb..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityMap extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- out.collect(record);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
deleted file mode 100644
index f45745d..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityReduce extends ReduceFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
- while (records.hasNext()) {
- out.collect(records.next());
- }
- }
-}
[4/4] flink git commit: [FLINK-1682] Ported optimizer unit tests from
Record API to Java API
Posted by fh...@apache.org.
[FLINK-1682] Ported optimizer unit tests from Record API to Java API
This closes #627
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd96ba8d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd96ba8d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd96ba8d
Branch: refs/heads/master
Commit: bd96ba8d1bbdc494ac88b98a6469255572f4a9fc
Parents: adb321d
Author: Fabian Hueske <fh...@apache.org>
Authored: Sat Apr 25 01:30:11 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue May 5 23:01:47 2015 +0200
----------------------------------------------------------------------
.../optimizer/AdditionalOperatorsTest.java | 64 +--
.../optimizer/BranchingPlansCompilerTest.java | 509 +++++++------------
.../BroadcastVariablePipelinebreakerTest.java | 7 +-
.../CachedMatchStrategyCompilerTest.java | 7 +-
.../optimizer/CoGroupSolutionSetFirstTest.java | 3 +-
.../flink/optimizer/DisjointDataFlowsTest.java | 7 +-
.../optimizer/DistinctCompilationTest.java | 10 +-
.../apache/flink/optimizer/GroupOrderTest.java | 100 ++--
.../optimizer/HardPlansCompilationTest.java | 55 +-
.../flink/optimizer/IterationsCompilerTest.java | 22 +-
.../flink/optimizer/NestedIterationsTest.java | 9 +-
.../flink/optimizer/ParallelismChangeTest.java | 250 ++++-----
.../flink/optimizer/PartitionPushdownTest.java | 5 +-
.../optimizer/PartitioningReusageTest.java | 57 ++-
.../flink/optimizer/PipelineBreakerTest.java | 16 +-
.../flink/optimizer/PropertyDataSourceTest.java | 49 +-
.../apache/flink/optimizer/ReduceAllTest.java | 31 +-
.../optimizer/ReplicatingDataSourceTest.java | 2 +-
.../SemanticPropertiesAPIToPlanTest.java | 5 +-
.../flink/optimizer/SortPartialReuseTest.java | 19 +-
.../UnionBetweenDynamicAndStaticPathTest.java | 9 +-
.../optimizer/UnionPropertyPropagationTest.java | 48 +-
.../flink/optimizer/UnionReplacementTest.java | 5 +-
.../WorksetIterationCornerCasesTest.java | 24 +-
.../WorksetIterationsRecordApiCompilerTest.java | 110 ++--
.../CoGroupCustomPartitioningTest.java | 4 +-
...ustomPartitioningGlobalOptimizationTest.java | 4 +-
.../GroupingKeySelectorTranslationTest.java | 6 +-
.../GroupingPojoTranslationTest.java | 8 +-
.../GroupingTupleTranslationTest.java | 8 +-
.../JoinCustomPartitioningTest.java | 4 +-
.../optimizer/java/PartitionOperatorTest.java | 4 +-
.../testfunctions/IdentityCoGrouper.java | 30 ++
.../testfunctions/IdentityCrosser.java | 32 ++
.../testfunctions/IdentityGroupReducer.java | 2 -
.../IdentityGroupReducerCombinable.java | 37 ++
.../optimizer/testfunctions/IdentityJoiner.java | 32 ++
.../flink/optimizer/util/DummyCoGroupStub.java | 42 --
.../flink/optimizer/util/DummyCrossStub.java | 32 --
.../flink/optimizer/util/DummyInputFormat.java | 42 --
.../flink/optimizer/util/DummyMatchStub.java | 37 --
.../util/DummyNonPreservingMatchStub.java | 35 --
.../flink/optimizer/util/DummyOutputFormat.java | 34 --
.../flink/optimizer/util/IdentityMap.java | 37 --
.../flink/optimizer/util/IdentityReduce.java | 40 --
.../flink/optimizer/util/OperatorResolver.java | 15 +-
46 files changed, 752 insertions(+), 1156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
index 0c50536..a4e74a9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
@@ -21,18 +21,14 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.CrossWithLargeOperator;
-import org.apache.flink.api.java.record.operators.CrossWithSmallOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
@@ -41,27 +37,23 @@ import org.junit.Test;
* Tests that validate optimizer choices when using operators that are requesting certain specific execution
* strategies.
*/
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
public class AdditionalOperatorsTest extends CompilerTestBase {
@Test
public void testCrossWithSmall() {
// construct the plan
- FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
- FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
-
- CrossOperator cross = CrossWithSmallOperator.builder(new DummyCrossStub())
- .input1(source1).input2(source2)
- .name("Cross").build();
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
-
- Plan plan = new Plan(sink);
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> set1 = env.generateSequence(0,1);
+ DataSet<Long> set2 = env.generateSequence(0,1);
+
+ set1.crossWithTiny(set2).name("Cross")
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
try {
- OptimizedPlan oPlan = compileNoStats(plan);
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
@@ -72,27 +64,23 @@ public class AdditionalOperatorsTest extends CompilerTestBase {
assertEquals(ShipStrategyType.BROADCAST, in2.getShipStrategy());
} catch(CompilerException ce) {
ce.printStackTrace();
- fail("The pact compiler is unable to compile this plan correctly.");
+ fail("The Flink optimizer is unable to compile this plan correctly.");
}
}
@Test
public void testCrossWithLarge() {
// construct the plan
- FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
- FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
-
- CrossOperator cross= CrossWithLargeOperator.builder(new DummyCrossStub())
- .input1(source1).input2(source2)
- .name("Cross").build();
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
-
- Plan plan = new Plan(sink);
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> set1 = env.generateSequence(0,1);
+ DataSet<Long> set2 = env.generateSequence(0,1);
+
+ set1.crossWithHuge(set2).name("Cross")
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());;
+
try {
+ JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index 94ff41a..1d5b7c1 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -20,15 +20,19 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.*;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.io.TextOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.optimizer.testfunctions.IdentityCoGrouper;
+import org.apache.flink.optimizer.testfunctions.IdentityCrosser;
+import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.util.Collector;
@@ -41,15 +45,6 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
@@ -59,18 +54,8 @@ import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
-import org.apache.flink.optimizer.util.DummyCoGroupStub;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-
-@SuppressWarnings({"serial", "deprecation"})
+
+@SuppressWarnings({"serial"})
public class BranchingPlansCompilerTest extends CompilerTestBase {
@@ -323,84 +308,53 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
public void testBranchEachContractType() {
try {
// construct the plan
- FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), "file:///test/file1", "Source A");
- FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), "file:///test/file2", "Source B");
- FileDataSource sourceC = new FileDataSource(new DummyInputFormat(), "file:///test/file3", "Source C");
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).input(sourceA).name("Map 1").build();
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(map1)
- .name("Reduce 1")
- .build();
-
- JoinOperator match1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(sourceB, sourceB, sourceC)
- .input2(sourceC)
- .name("Match 1")
- .build();
- ;
- CoGroupOperator cogroup1 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(sourceA)
- .input2(sourceB)
- .name("CoGroup 1")
- .build();
-
- CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub())
- .input1(reduce1)
- .input2(cogroup1)
- .name("Cross 1")
- .build();
-
-
- CoGroupOperator cogroup2 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(cross1)
- .input2(cross1)
- .name("CoGroup 2")
- .build();
-
- CoGroupOperator cogroup3 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(map1)
- .input2(match1)
- .name("CoGroup 3")
- .build();
-
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).input(cogroup3).name("Map 2").build();
-
- CoGroupOperator cogroup4 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(map2)
- .input2(match1)
- .name("CoGroup 4")
- .build();
-
- CoGroupOperator cogroup5 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(cogroup2)
- .input2(cogroup1)
- .name("CoGroup 5")
- .build();
-
- CoGroupOperator cogroup6 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(reduce1)
- .input2(cogroup4)
- .name("CoGroup 6")
- .build();
-
- CoGroupOperator cogroup7 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(cogroup5)
- .input2(cogroup6)
- .name("CoGroup 7")
- .build();
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cogroup7);
- sink.addInput(sourceA);
- sink.addInput(cogroup3);
- sink.addInput(cogroup4);
- sink.addInput(cogroup1);
-
- // return the PACT plan
- Plan plan = new Plan(sink, "Branching of each contract type");
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> sourceA = env.generateSequence(0,1);
+ DataSet<Long> sourceB = env.generateSequence(0,1);
+ DataSet<Long> sourceC = env.generateSequence(0,1);
+
+ DataSet<Long> map1 = sourceA.map(new IdentityMapper<Long>()).name("Map 1");
+
+ DataSet<Long> reduce1 = map1.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce 1");
+
+ DataSet<Long> join1 = sourceB.union(sourceB).union(sourceC)
+ .join(sourceC).where("*").equalTo("*")
+ .with(new IdentityJoiner<Long>()).name("Join 1");
+
+ DataSet<Long> coGroup1 = sourceA.coGroup(sourceB).where("*").equalTo("*")
+ .with(new IdentityCoGrouper<Long>()).name("CoGroup 1");
+
+ DataSet<Long> cross1 = reduce1.cross(coGroup1)
+ .with(new IdentityCrosser<Long>()).name("Cross 1");
+
+ DataSet<Long> coGroup2 = cross1.coGroup(cross1).where("*").equalTo("*")
+ .with(new IdentityCoGrouper<Long>()).name("CoGroup 2");
+
+ DataSet<Long> coGroup3 = map1.coGroup(join1).where("*").equalTo("*")
+ .with(new IdentityCoGrouper<Long>()).name("CoGroup 3");
+
+ DataSet<Long> map2 = coGroup3.map(new IdentityMapper<Long>()).name("Map 2");
+
+ DataSet<Long> coGroup4 = map2.coGroup(join1).where("*").equalTo("*")
+ .with(new IdentityCoGrouper<Long>()).name("CoGroup 4");
+
+ DataSet<Long> coGroup5 = coGroup2.coGroup(coGroup1).where("*").equalTo("*")
+ .with(new IdentityCoGrouper<Long>()).name("CoGroup 5");
+
+ DataSet<Long> coGroup6 = reduce1.coGroup(coGroup4).where("*").equalTo("*")
+ .with(new IdentityCoGrouper<Long>()).name("CoGroup 6");
+
+ DataSet<Long> coGroup7 = coGroup5.coGroup(coGroup6).where("*").equalTo("*")
+ .with(new IdentityCoGrouper<Long>()).name("CoGroup 7");
+
+ coGroup7.union(sourceA)
+ .union(coGroup3)
+ .union(coGroup4)
+ .union(coGroup1)
+ .output(new DiscardingOutputFormat<Long>());
+
+ JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
JobGraphGenerator jobGen = new JobGraphGenerator();
@@ -418,47 +372,33 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
public void testBranchingUnion() {
try {
// construct the plan
- FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE);
- FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE);
-
- JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(source1)
- .input2(source2)
- .name("Match 1")
- .build();
-
- MapOperator ma1 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map1").build();
-
- ReduceOperator r1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(ma1)
- .name("Reduce 1")
- .build();
-
- ReduceOperator r2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(mat1)
- .name("Reduce 2")
- .build();
-
- MapOperator ma2 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map 2").build();
-
- MapOperator ma3 = MapOperator.builder(new IdentityMap()).input(ma2).name("Map 3").build();
-
- @SuppressWarnings("unchecked")
- JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(r1, r2, ma2, ma3)
- .input2(ma2)
- .name("Match 2")
- .build();
- mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_MERGE);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat2);
-
-
- // return the PACT plan
- Plan plan = new Plan(sink, "Branching Union");
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> source1 = env.generateSequence(0,1);
+ DataSet<Long> source2 = env.generateSequence(0,1);
+
+ DataSet<Long> join1 = source1.join(source2).where("*").equalTo("*")
+ .with(new IdentityJoiner<Long>()).name("Join 1");
+
+ DataSet<Long> map1 = join1.map(new IdentityMapper<Long>()).name("Map 1");
+
+ DataSet<Long> reduce1 = map1.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce 1");
+
+ DataSet<Long> reduce2 = join1.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce 2");
+
+ DataSet<Long> map2 = join1.map(new IdentityMapper<Long>()).name("Map 2");
+
+ DataSet<Long> map3 = map2.map(new IdentityMapper<Long>()).name("Map 3");
+
+ DataSet<Long> join2 = reduce1.union(reduce2).union(map2).union(map3)
+ .join(map2, JoinHint.REPARTITION_SORT_MERGE).where("*").equalTo("*")
+ .with(new IdentityJoiner<Long>()).name("Join 2");
+
+ join2.output(new DiscardingOutputFormat<Long>());
+
+ JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
-
+
JobGraphGenerator jobGen = new JobGraphGenerator();
//Compile plan to verify that no error is thrown
@@ -480,22 +420,18 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
@Test
public void testBranchingWithMultipleDataSinksSmall() {
try {
+ String outPath1 = "/tmp/out1";
+ String outPath2 = "/tmp/out2";
+
// construct the plan
- final String out1Path = "file:///test/1";
- final String out2Path = "file:///test/2";
-
- FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
-
- FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA);
- FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, sourceA);
-
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sinkA);
- sinks.add(sinkB);
-
- // return the PACT plan
- Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks");
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> source1 = env.generateSequence(0,1);
+
+ source1.writeAsText(outPath1);
+ source1.writeAsText(outPath2);
+
+ JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
// ---------- check the optimizer plan ----------
@@ -505,15 +441,16 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
// sinks contain all sink paths
Set<String> allSinks = new HashSet<String>();
- allSinks.add(out1Path);
- allSinks.add(out2Path);
+ allSinks.add(outPath1);
+ allSinks.add(outPath2);
for (SinkPlanNode n : oPlan.getDataSinks()) {
- String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
+ String path = ((TextOutputFormat<String>)n.getSinkNode().getOperator()
+ .getFormatWrapper().getUserCodeObject()).getOutputFilePath().toString();
Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
}
- // ---------- compile plan to nephele job graph to verify that no error is thrown ----------
+ // ---------- compile plan to job graph to verify that no error is thrown ----------
JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
@@ -541,50 +478,38 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
final String out3Path = "file:///test/3";
final String out4Path = "file:///test/4";
- FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
- FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE);
-
- FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA, "1");
- FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, out2Path, sourceB, "2");
- FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, out3Path, sourceA, "3");
- FileDataSink sink4 = new FileDataSink(DummyOutputFormat.class, out4Path, sourceB, "4");
-
-
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sink1);
- sinks.add(sink2);
- sinks.add(sink3);
- sinks.add(sink4);
-
- // return the PACT plan
- Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks and branches");
+ // construct the plan
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> sourceA = env.generateSequence(0,1);
+ DataSet<Long> sourceB = env.generateSequence(0,1);
+
+ sourceA.writeAsText(out1Path);
+ sourceB.writeAsText(out2Path);
+ sourceA.writeAsText(out3Path);
+ sourceB.writeAsText(out4Path);
+
+ JavaPlan plan = env.createProgramPlan();
compileNoStats(plan);
+
}
@Test
public void testBranchAfterIteration() {
- FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-
- BulkIteration iteration = new BulkIteration("Loop");
- iteration.setInput(sourceA);
- iteration.setMaximumNumberOfIterations(10);
-
- MapOperator mapper = MapOperator.builder(IdentityMap.class).name("Mapper").input(iteration.getPartialSolution()).build();
- iteration.setNextPartialSolution(mapper);
-
- FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 1");
-
- MapOperator postMap = MapOperator.builder(IdentityMap.class).name("Post Iteration Mapper")
- .input(iteration).build();
-
- FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink 2");
-
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sink1);
- sinks.add(sink2);
-
- Plan plan = new Plan(sinks);
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> sourceA = env.generateSequence(0,1);
+
+ IterativeDataSet<Long> loopHead = sourceA.iterate(10);
+ DataSet<Long> loopTail = loopHead.map(new IdentityMapper<Long>()).name("Mapper");
+ DataSet<Long> loopRes = loopHead.closeWith(loopTail);
+
+ loopRes.output(new DiscardingOutputFormat<Long>());
+ loopRes.map(new IdentityMapper<Long>())
+ .output(new DiscardingOutputFormat<Long>());;
+
+ JavaPlan plan = env.createProgramPlan();
+
try {
compileNoStats(plan);
}
@@ -596,31 +521,20 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
@Test
public void testBranchBeforeIteration() {
- FileDataSource source1 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
- FileDataSource source2 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-
- BulkIteration iteration = new BulkIteration("Loop");
- iteration.setInput(source2);
- iteration.setMaximumNumberOfIterations(10);
-
- MapOperator inMap = MapOperator.builder(new IdentityMap())
- .input(source1)
- .name("In Iteration Map")
- .setBroadcastVariable("BC", iteration.getPartialSolution())
- .build();
-
- iteration.setNextPartialSolution(inMap);
-
- MapOperator postMap = MapOperator.builder(new IdentityMap())
- .input(source1)
- .name("Post Iteration Map")
- .setBroadcastVariable("BC", iteration)
- .build();
-
- FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink");
-
- Plan plan = new Plan(sink);
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> source1 = env.generateSequence(0,1);
+ DataSet<Long> source2 = env.generateSequence(0,1);
+
+ IterativeDataSet<Long> loopHead = source2.iterate(10).name("Loop");
+ DataSet<Long> loopTail = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopHead, "BC").name("In-Loop Mapper");
+ DataSet<Long> loopRes = loopHead.closeWith(loopTail);
+
+ DataSet<Long> map = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopRes, "BC").name("Post-Loop Mapper");
+ map.output(new DiscardingOutputFormat<Long>());
+
+ JavaPlan plan = env.createProgramPlan();
+
try {
compileNoStats(plan);
}
@@ -644,31 +558,22 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
*/
@Test
public void testClosure() {
- FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
- FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-
- FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
- FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceB, "Sink 2");
-
- BulkIteration iteration = new BulkIteration("Loop");
- iteration.setInput(sourceA);
- iteration.setMaximumNumberOfIterations(10);
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> sourceA = env.generateSequence(0,1);
+ DataSet<Long> sourceB = env.generateSequence(0,1);
- CrossOperator stepFunction = CrossOperator.builder(DummyCrossStub.class).name("StepFunction").
- input1(iteration.getPartialSolution()).
- input2(sourceB).
- build();
+ sourceA.output(new DiscardingOutputFormat<Long>());
+ sourceB.output(new DiscardingOutputFormat<Long>());
- iteration.setNextPartialSolution(stepFunction);
+ IterativeDataSet<Long> loopHead = sourceA.iterate(10).name("Loop");
- FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
+ DataSet<Long> loopTail = loopHead.cross(sourceB).with(new IdentityCrosser<Long>());
+ DataSet<Long> loopRes = loopHead.closeWith(loopTail);
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sink1);
- sinks.add(sink2);
- sinks.add(sink3);
+ loopRes.output(new DiscardingOutputFormat<Long>());
- Plan plan = new Plan(sinks);
+ JavaPlan plan = env.createProgramPlan();
try{
compileNoStats(plan);
@@ -691,40 +596,24 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
*/
@Test
public void testClosureDeltaIteration() {
- FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
- FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
- FileDataSource sourceC = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 3");
-
- FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
- FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceC, "Sink 2");
-
- DeltaIteration iteration = new DeltaIteration(0, "Loop");
- iteration.setInitialSolutionSet(sourceA);
- iteration.setInitialWorkset(sourceB);
- iteration.setMaximumNumberOfIterations(10);
-
- CrossOperator nextWorkset = CrossOperator.builder(DummyCrossStub.class).name("Next workset").
- input1(iteration.getWorkset()).
- input2(sourceC).
- build();
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(0,1).map(new Duplicator<Long>());
+ DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(0,1).map(new Duplicator<Long>());
+ DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(0,1).map(new Duplicator<Long>());
- JoinOperator solutionSetDelta = JoinOperator.builder(DummyMatchStub.class, LongValue.class,0,0).
- name("Next solution set.").
- input1(nextWorkset).
- input2(iteration.getSolutionSet()).
- build();
+ sourceA.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
+ sourceC.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
- iteration.setNextWorkset(nextWorkset);
- iteration.setSolutionSetDelta(solutionSetDelta);
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> loop = sourceA.iterateDelta(sourceB, 10, 0);
- FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
+ DataSet<Tuple2<Long, Long>> workset = loop.getWorkset().cross(sourceB).with(new IdentityCrosser<Tuple2<Long, Long>>()).name("Next work set");
+ DataSet<Tuple2<Long, Long>> delta = workset.join(loop.getSolutionSet()).where(0).equalTo(0).with(new IdentityJoiner<Tuple2<Long, Long>>()).name("Solution set delta");
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sink1);
- sinks.add(sink2);
- sinks.add(sink3);
+ DataSet<Tuple2<Long, Long>> result = loop.closeWith(delta, workset);
+ result.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
- Plan plan = new Plan(sinks);
+ JavaPlan plan = env.createProgramPlan();
try{
compileNoStats(plan);
@@ -752,44 +641,26 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
*/
@Test
public void testDeltaIterationWithStaticInput() {
- FileDataSource source = new FileDataSource(DummyInputFormat.class, IN_FILE, "source");
-
- MapOperator mappedSource = MapOperator.builder(IdentityMap.class).
- input(source).
- name("Identity mapped source").
- build();
-
- ReduceOperator reducedSource = ReduceOperator.builder(IdentityReduce.class).
- input(source).
- name("Identity reduce source").
- build();
-
- DeltaIteration iteration = new DeltaIteration(0,"Loop");
- iteration.setMaximumNumberOfIterations(10);
- iteration.setInitialSolutionSet(source);
- iteration.setInitialWorkset(mappedSource);
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Tuple2<Long, Long>> source = env.generateSequence(0,1).map(new Duplicator<Long>());
- JoinOperator nextWorkset = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,0).
- input1(iteration.getWorkset()).
- input2(reducedSource).
- name("Next work set").
- build();
+ DataSet<Tuple2<Long,Long>> map = source
+ .map(new IdentityMapper<Tuple2<Long, Long>>());
+ DataSet<Tuple2<Long,Long>> reduce = source
+ .reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>());
- JoinOperator solutionSetDelta = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,
- 0).
- input1(iteration.getSolutionSet()).
- input2(nextWorkset).
- name("Solution set delta").
- build();
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> loop = source.iterateDelta(map, 10, 0);
- iteration.setNextWorkset(nextWorkset);
- iteration.setSolutionSetDelta(solutionSetDelta);
+ DataSet<Tuple2<Long, Long>> workset = loop.getWorkset().join(reduce).where(0).equalTo(0)
+ .with(new IdentityJoiner<Tuple2<Long, Long>>()).name("Next work set");
+ DataSet<Tuple2<Long, Long>> delta = loop.getSolutionSet().join(workset).where(0).equalTo(0)
+ .with(new IdentityJoiner<Tuple2<Long, Long>>()).name("Solution set delta");
- FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Iteration sink");
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sink);
+ DataSet<Tuple2<Long, Long>> result = loop.closeWith(delta, workset);
+ result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
- Plan plan = new Plan(sinks);
+ JavaPlan plan = env.createProgramPlan();
try{
compileNoStats(plan);
@@ -871,7 +742,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
.withBroadcastSet(input3, "bc1")
.withBroadcastSet(input1, "bc2")
.withBroadcastSet(result1, "bc3")
- .print();
+ .output(new DiscardingOutputFormat<String>());
Plan plan = env.createProgramPlan();
@@ -900,7 +771,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
IterativeDataSet<String> iteration = initialSolution.iterate(100);
iteration.closeWith(iteration.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "red"))
- .print();
+ .output(new DiscardingOutputFormat<String>());
Plan plan = env.createProgramPlan();
@@ -927,9 +798,12 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
IterativeDataSet<String> iteration2 = input.iterate(20);
IterativeDataSet<String> iteration3 = input.iterate(17);
- iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1")).print();
- iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2")).print();
- iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3")).print();
+ iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1"))
+ .output(new DiscardingOutputFormat<String>());
+ iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2"))
+ .output(new DiscardingOutputFormat<String>());
+ iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3"))
+ .output(new DiscardingOutputFormat<String>());
Plan plan = env.createProgramPlan();
@@ -953,9 +827,12 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
IterativeDataSet<String> iteration3 = input.iterate(17);
- iteration1.closeWith(iteration1.map(new IdentityMapper<String>())).print();
- iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>())).print();
- iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>())).print();
+ iteration1.closeWith(iteration1.map(new IdentityMapper<String>()))
+ .output(new DiscardingOutputFormat<String>());
+ iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()))
+ .output(new DiscardingOutputFormat<String>());
+ iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()))
+ .output(new DiscardingOutputFormat<String>());
Plan plan = env.createProgramPlan();
@@ -979,7 +856,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
input
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
- .print();
+ .output(new DiscardingOutputFormat<Long>());
Plan plan = env.createProgramPlan();
compileNoStats(plan);
@@ -1019,7 +896,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
.map(new IdentityMapper<Tuple2<Long,Long>>())
.withBroadcastSet(bc_input1, "bc1")
.union(joinResult)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan plan = env.createProgramPlan();
compileNoStats(plan);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
index 57c53ff..b0ecfe5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.*;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
import org.apache.flink.api.java.DataSet;
@@ -43,7 +44,8 @@ public class BroadcastVariablePipelinebreakerTest extends CompilerTestBase {
DataSet<String> source1 = env.fromElements("test");
DataSet<String> source2 = env.fromElements("test");
- source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name").print();
+ source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name")
+ .output(new DiscardingOutputFormat<String>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -66,7 +68,8 @@ public class BroadcastVariablePipelinebreakerTest extends CompilerTestBase {
DataSet<String> source1 = env.fromElements("test");
- source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name").print();
+ source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name")
+ .output(new DiscardingOutputFormat<String>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
index 1a4cd18..e795508 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
@@ -215,7 +216,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
Configuration joinStrategy = new Configuration();
joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
- if(strategy != "") {
+ if(!strategy.equals("")) {
joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
}
@@ -223,7 +224,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
- output.print();
+ output.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
return env.createProgramPlan();
@@ -250,7 +251,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
- output.print();
+ output.output(new DiscardingOutputFormat<Tuple3<Long,Long,Long>>());
return env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
index 61d407a..f066b36 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Assert;
import org.junit.Test;
@@ -64,7 +65,7 @@ public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
DataSet<Tuple1<Integer>> feedback = iteration.getWorkset().map(new SimpleMap());
DataSet<Tuple1<Integer>> result = iteration.closeWith(delta, feedback);
- result.print();
+ result.output(new DiscardingOutputFormat<Tuple1<Integer>>());
Plan plan = env.createProgramPlan();
OptimizedPlan oPlan = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
index bb3aa47..68953c0 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -36,8 +37,10 @@ public class DisjointDataFlowsTest extends CompilerTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// generate two different flows
- env.generateSequence(1, 10).print();
- env.generateSequence(1, 10).print();
+ env.generateSequence(1, 10)
+ .output(new DiscardingOutputFormat<Long>());
+ env.generateSequence(1, 10)
+ .output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 7865861..5827d9c 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -50,7 +51,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
data
.distinct().name("reducer")
- .print().name("sink");
+ .output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -85,7 +86,6 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
- System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
@@ -104,7 +104,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
.distinct(new KeySelector<Tuple2<String,Double>, String>() {
public String getKey(Tuple2<String, Double> value) { return value.f0; }
}).name("reducer")
- .print().name("sink");
+ .output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -146,7 +146,6 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
- System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
@@ -164,7 +163,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
DistinctOperator<Tuple2<String, Double>> reduced = data
.distinct(1).name("reducer");
- reduced.print().name("sink");
+ reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -199,7 +198,6 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
- System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
index 76b3b0e..7328423 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
@@ -20,30 +20,24 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.fail;
-import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.util.DummyCoGroupStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityCoGrouper;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
import org.junit.Assert;
import org.junit.Test;
@@ -51,25 +45,24 @@ import org.junit.Test;
* This test case has been created to validate that correct strategies are used if orders within groups are
* requested.
*/
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
public class GroupOrderTest extends CompilerTestBase {
@Test
public void testReduceWithGroupOrder() {
// construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-
- ReduceOperator reduce = ReduceOperator.builder(new IdentityReduce()).keyField(IntValue.class, 2).name("Reduce").input(source).build();
- Ordering groupOrder = new Ordering(5, StringValue.class, Order.DESCENDING);
- reduce.setGroupOrder(groupOrder);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, reduce, "Sink");
-
-
- Plan plan = new Plan(sink, "Test Temp Task");
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Tuple4<Long, Long, Long, Long>> set1 = env.readCsvFile("/tmp/fake.csv")
+ .types(Long.class, Long.class, Long.class, Long.class);
+
+ set1.groupBy(1).sortGroup(3, Order.DESCENDING)
+ .reduceGroup(new IdentityGroupReducer<Tuple4<Long, Long, Long, Long>>()).name("Reduce")
+ .output(new DiscardingOutputFormat<Tuple4<Long, Long, Long, Long>>()).name("Sink");
+
+ JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan;
+
try {
oPlan = compileNoStats(plan);
} catch(CompilerException ce) {
@@ -89,38 +82,35 @@ public class GroupOrderTest extends CompilerTestBase {
Channel c = reducer.getInput();
Assert.assertEquals(LocalStrategy.SORT, c.getLocalStrategy());
- FieldList ship = new FieldList(2);
- FieldList local = new FieldList(2, 5);
+ FieldList ship = new FieldList(1);
+ FieldList local = new FieldList(1, 3);
Assert.assertEquals(ship, c.getShipStrategyKeys());
Assert.assertEquals(local, c.getLocalStrategyKeys());
Assert.assertTrue(c.getLocalStrategySortOrder()[0] == reducer.getSortOrders(0)[0]);
// check that we indeed sort descending
- Assert.assertTrue(c.getLocalStrategySortOrder()[1] == groupOrder.getFieldSortDirections()[0]);
+ Assert.assertEquals(false, c.getLocalStrategySortOrder()[1]);
}
@Test
public void testCoGroupWithGroupOrder() {
// construct the plan
- FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source1");
- FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source2");
-
- CoGroupOperator coGroup = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 3, 6)
- .keyField(LongValue.class, 0, 0)
- .name("CoGroup").input1(source1).input2(source2).build();
-
- Ordering groupOrder1 = new Ordering(5, StringValue.class, Order.DESCENDING);
- Ordering groupOrder2 = new Ordering(1, StringValue.class, Order.DESCENDING);
- groupOrder2.appendOrdering(4, DoubleValue.class, Order.ASCENDING);
- coGroup.setGroupOrderForInputOne(groupOrder1);
- coGroup.setGroupOrderForInputTwo(groupOrder2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, coGroup, "Sink");
-
- Plan plan = new Plan(sink, "Reduce Group Order Test");
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Tuple7<Long, Long, Long, Long, Long, Long, Long>> set1 = env.readCsvFile("/tmp/fake1.csv")
+ .types(Long.class, Long.class, Long.class, Long.class, Long.class, Long.class, Long.class);
+ DataSet<Tuple7<Long, Long, Long, Long, Long, Long, Long>> set2 = env.readCsvFile("/tmp/fake2.csv")
+ .types(Long.class, Long.class, Long.class, Long.class, Long.class, Long.class, Long.class);
+
+ set1.coGroup(set2).where(3,0).equalTo(6,0)
+ .sortFirstGroup(5, Order.DESCENDING)
+ .sortSecondGroup(1, Order.DESCENDING).sortSecondGroup(4, Order.ASCENDING)
+ .with(new IdentityCoGrouper<Tuple7<Long, Long, Long, Long, Long, Long, Long>>()).name("CoGroup")
+ .output(new DiscardingOutputFormat<Tuple7<Long, Long, Long, Long, Long, Long, Long>>()).name("Sink");
+
+ JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan;
+
try {
oPlan = compileNoStats(plan);
} catch(CompilerException ce) {
@@ -144,11 +134,11 @@ public class GroupOrderTest extends CompilerTestBase {
Assert.assertEquals(LocalStrategy.SORT, c1.getLocalStrategy());
Assert.assertEquals(LocalStrategy.SORT, c2.getLocalStrategy());
- FieldList ship1 = new FieldList(new int[] {3, 0});
- FieldList ship2 = new FieldList(new int[] {6, 0});
+ FieldList ship1 = new FieldList(3, 0);
+ FieldList ship2 = new FieldList(6, 0);
- FieldList local1 = new FieldList(new int[] {3, 0, 5});
- FieldList local2 = new FieldList(new int[] {6, 0, 1, 4});
+ FieldList local1 = new FieldList(3, 0, 5);
+ FieldList local2 = new FieldList(6, 0, 1, 4);
Assert.assertEquals(ship1, c1.getShipStrategyKeys());
Assert.assertEquals(ship2, c2.getShipStrategyKeys());
@@ -161,8 +151,8 @@ public class GroupOrderTest extends CompilerTestBase {
Assert.assertTrue(c2.getLocalStrategySortOrder()[1] == coGroupNode.getSortOrders()[1]);
// check that the local group orderings are correct
- Assert.assertTrue(c1.getLocalStrategySortOrder()[2] == groupOrder1.getFieldSortDirections()[0]);
- Assert.assertTrue(c2.getLocalStrategySortOrder()[2] == groupOrder2.getFieldSortDirections()[0]);
- Assert.assertTrue(c2.getLocalStrategySortOrder()[3] == groupOrder2.getFieldSortDirections()[1]);
+ Assert.assertEquals(false, c1.getLocalStrategySortOrder()[2]);
+ Assert.assertEquals(false, c2.getLocalStrategySortOrder()[2]);
+ Assert.assertEquals(true, c2.getLocalStrategySortOrder()[3]);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
index 52e9a2d..adca504 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
@@ -18,21 +18,16 @@
package org.apache.flink.optimizer;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityCrosser;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.IntValue;
import org.junit.Test;
/**
@@ -41,7 +36,7 @@ import org.junit.Test;
* <li> Ticket 158
* </ul>
*/
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
public class HardPlansCompilationTest extends CompilerTestBase {
/**
@@ -54,27 +49,21 @@ public class HardPlansCompilationTest extends CompilerTestBase {
@Test
public void testTicket158() {
// construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-
- MapOperator map = MapOperator.builder(new IdentityMap()).name("Map1").input(source).build();
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce1").input(map).build();
-
- CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub()).name("Cross1").input1(reduce1).input2(source).build();
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce2").input(cross1).build();
-
- CrossOperator cross2 = CrossOperator.builder(new DummyCrossStub()).name("Cross2").input1(reduce2).input2(source).build();
-
- ReduceOperator reduce3 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce3").input(cross2).build();
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setInput(reduce3);
-
- Plan plan = new Plan(sink, "Test Temp Task");
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataSet<Long> set1 = env.generateSequence(0,1);
+
+ set1.map(new IdentityMapper<Long>()).name("Map1")
+ .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
+ .cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
+ .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
+ .cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
+ .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
+ .output(new DiscardingOutputFormat<Long>()).name("Sink");
+
+ JavaPlan plan = env.createProgramPlan();
OptimizedPlan oPlan = compileNoStats(plan);
+
JobGraphGenerator jobGen = new JobGraphGenerator();
jobGen.compileJobGraph(oPlan);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
index 0afbe93..269be6e 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.*;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
@@ -72,7 +73,8 @@ public class IterationsCompilerTest extends CompilerTestBase {
.map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1);
- iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result).print();
+ iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result)
+ .output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
OptimizedPlan p = compileNoStats(env.createProgramPlan());
@@ -104,7 +106,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges);
- depResult.print();
+ depResult.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -120,7 +122,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
- System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getMessage());
}
@@ -140,7 +141,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges);
- depResult.print();
+ depResult.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -156,7 +157,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
- System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getMessage());
}
@@ -176,7 +176,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges);
- secondResult.print();
+ secondResult.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -192,7 +192,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
- System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getMessage());
}
@@ -208,7 +207,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);
- doBulkIteration(input1, input2).print();
+ doBulkIteration(input1, input2).output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -226,7 +225,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
new JobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
- System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getMessage());
}
@@ -253,7 +251,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
initialWorkset
.join(result, JoinHint.REPARTITION_HASH_FIRST)
.where(0).equalTo(0)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
Plan p = env.createProgramPlan();
compileNoStats(p);
@@ -295,7 +293,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
DataSet<Long> result = iteration.closeWith(width.union(update).union(lastGradient));
- result.print();
+ result.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -348,7 +346,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
.flatMap(new FlatMapJoin());
DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
-
+
return depResult;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
index 34fc085..3a51451 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -52,7 +53,7 @@ public class NestedIterationsTest extends CompilerTestBase {
DataSet<Long> outerResult = outerIteration.closeWith(innerResult.map(new IdentityMapper<Long>()));
- outerResult.print();
+ outerResult.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
@@ -88,7 +89,7 @@ public class NestedIterationsTest extends CompilerTestBase {
DataSet<Tuple2<Long, Long>> outerResult = outerIteration.closeWith(innerResult, innerResult);
- outerResult.print();
+ outerResult.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
@@ -126,7 +127,7 @@ public class NestedIterationsTest extends CompilerTestBase {
DataSet<Long> mainResult = mainIteration.closeWith(joined);
- mainResult.print();
+ mainResult.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
@@ -164,7 +165,7 @@ public class NestedIterationsTest extends CompilerTestBase {
DataSet<Tuple2<Long, Long>> mainResult = mainIteration.closeWith(joined, joined);
- mainResult.print();
+ mainResult.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
index 8236f10..9ddff33 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
@@ -17,6 +17,10 @@
*/
package org.apache.flink.optimizer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -24,22 +28,13 @@ import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Assert;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.IntValue;
import org.apache.flink.util.Visitor;
import org.junit.Test;
@@ -50,7 +45,7 @@ import org.junit.Test;
* parallelism between tasks is increased or decreased.
* </ul>
*/
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
public class ParallelismChangeTest extends CompilerTestBase {
/**
@@ -62,34 +57,24 @@ public class ParallelismChangeTest extends CompilerTestBase {
*/
@Test
public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
- final int degOfPar = DEFAULT_PARALLELISM;
-
+ final int p = DEFAULT_PARALLELISM;
+
// construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- source.setParallelism(degOfPar);
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
- map1.setParallelism(degOfPar);
- map1.setInput(source);
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
- reduce1.setParallelism(degOfPar);
- reduce1.setInput(map1);
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
- map2.setParallelism(degOfPar * 2);
- map2.setInput(reduce1);
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
- reduce2.setParallelism(degOfPar * 2);
- reduce2.setInput(map2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setParallelism(degOfPar * 2);
- sink.setInput(reduce2);
-
- Plan plan = new Plan(sink, "Test Increasing parallelism");
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(p);
+ DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);
+
+ set1.map(new IdentityMapper<Long>())
+ .withForwardedFields("*").setParallelism(p).name("Map1")
+ .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+ .withForwardedFields("*").setParallelism(p).name("Reduce1")
+ .map(new IdentityMapper<Long>())
+ .withForwardedFields("*").setParallelism(p * 2).name("Map2")
+ .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+ .withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
+ .output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
+
+ JavaPlan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
@@ -116,33 +101,24 @@ public class ParallelismChangeTest extends CompilerTestBase {
*/
@Test
public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
- final int degOfPar = DEFAULT_PARALLELISM;
-
+ final int p = DEFAULT_PARALLELISM;
+
// construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- source.setParallelism(degOfPar);
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
- map1.setParallelism(degOfPar);
- map1.setInput(source);
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
- reduce1.setParallelism(degOfPar);
- reduce1.setInput(map1);
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
- map2.setParallelism(degOfPar);
- map2.setInput(reduce1);
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
- reduce2.setParallelism(degOfPar * 2);
- reduce2.setInput(map2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setParallelism(degOfPar * 2);
- sink.setInput(reduce2);
-
- Plan plan = new Plan(sink, "Test Increasing parallelism");
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(p);
+ DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);
+
+ set1.map(new IdentityMapper<Long>())
+ .withForwardedFields("*").setParallelism(p).name("Map1")
+ .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+ .withForwardedFields("*").setParallelism(p).name("Reduce1")
+ .map(new IdentityMapper<Long>())
+ .withForwardedFields("*").setParallelism(p).name("Map2")
+ .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+ .withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
+ .output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
+
+ JavaPlan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
@@ -170,34 +146,24 @@ public class ParallelismChangeTest extends CompilerTestBase {
*/
@Test
public void checkPropertyHandlingWithIncreasingLocalParallelism() {
- final int degOfPar = 2 * DEFAULT_PARALLELISM;
-
+ final int p = DEFAULT_PARALLELISM * 2;
+
// construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- source.setParallelism(degOfPar);
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
- map1.setParallelism(degOfPar);
- map1.setInput(source);
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
- reduce1.setParallelism(degOfPar);
- reduce1.setInput(map1);
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
- map2.setParallelism(degOfPar * 2);
- map2.setInput(reduce1);
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
- reduce2.setParallelism(degOfPar * 2);
- reduce2.setInput(map2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setParallelism(degOfPar * 2);
- sink.setInput(reduce2);
-
- Plan plan = new Plan(sink, "Test Increasing parallelism");
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(p);
+ DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);
+
+ set1.map(new IdentityMapper<Long>())
+ .withForwardedFields("*").setParallelism(p).name("Map1")
+ .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+ .withForwardedFields("*").setParallelism(p).name("Reduce1")
+ .map(new IdentityMapper<Long>())
+ .withForwardedFields("*").setParallelism(p * 2).name("Map2")
+ .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+ .withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
+ .output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
+
+ JavaPlan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
@@ -217,38 +183,27 @@ public class ParallelismChangeTest extends CompilerTestBase {
(ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
}
-
-
@Test
public void checkPropertyHandlingWithDecreasingParallelism() {
- final int degOfPar = DEFAULT_PARALLELISM;
-
+ final int p = DEFAULT_PARALLELISM;
+
// construct the plan
- FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
- source.setParallelism(degOfPar * 2);
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
- map1.setParallelism(degOfPar * 2);
- map1.setInput(source);
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
- reduce1.setParallelism(degOfPar * 2);
- reduce1.setInput(map1);
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
- map2.setParallelism(degOfPar);
- map2.setInput(reduce1);
-
- ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
- reduce2.setParallelism(degOfPar);
- reduce2.setInput(map2);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
- sink.setParallelism(degOfPar);
- sink.setInput(reduce2);
-
- Plan plan = new Plan(sink, "Test Increasing parallelism");
-
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(p);
+
+ env
+ .generateSequence(0, 1).setParallelism(p * 2)
+ .map(new IdentityMapper<Long>())
+ .withForwardedFields("*").setParallelism(p * 2).name("Map1")
+ .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+ .withForwardedFields("*").setParallelism(p * 2).name("Reduce1")
+ .map(new IdentityMapper<Long>())
+ .withForwardedFields("*").setParallelism(p).name("Map2")
+ .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+ .withForwardedFields("*").setParallelism(p).name("Reduce2")
+ .output(new DiscardingOutputFormat<Long>()).setParallelism(p).name("Sink");
+
+ JavaPlan plan = env.createProgramPlan();
// submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
@@ -284,40 +239,29 @@ public class ParallelismChangeTest extends CompilerTestBase {
*/
@Test
public void checkPropertyHandlingWithTwoInputs() {
+
// construct the plan
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
- FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
- FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
-
- ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(sourceA)
- .build();
- ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(sourceB)
- .build();
-
- JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(redA)
- .input2(redB)
- .build();
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat);
-
- sourceA.setParallelism(5);
- sourceB.setParallelism(7);
- redA.setParallelism(5);
- redB.setParallelism(7);
-
- mat.setParallelism(5);
-
- sink.setParallelism(5);
-
-
- // return the PACT plan
- Plan plan = new Plan(sink, "Partition on DoP Change");
-
+ DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(5);
+ DataSet<Long> set2 = env.generateSequence(0,1).setParallelism(7);
+
+ DataSet<Long> reduce1 = set1
+ .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+ .withForwardedFields("*").setParallelism(5);
+ DataSet<Long> reduce2 = set2
+ .groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+ .withForwardedFields("*").setParallelism(7);
+
+ reduce1.join(reduce2).where("*").equalTo("*")
+ .with(new IdentityJoiner<Long>()).setParallelism(5)
+ .output(new DiscardingOutputFormat<Long>()).setParallelism(5);
+
+ JavaPlan plan = env.createProgramPlan();
+ // submit the plan to the compiler
OptimizedPlan oPlan = compileNoStats(plan);
-
+
JobGraphGenerator jobGen = new JobGraphGenerator();
//Compile plan to verify that no error is thrown
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
index 72effc1..365726d 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -46,7 +47,7 @@ public class PartitionPushdownTest extends CompilerTestBase {
input
.groupBy(0, 1).sum(2)
.groupBy(0).sum(1)
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -82,7 +83,7 @@ public class PartitionPushdownTest extends CompilerTestBase {
input
.groupBy(0).sum(1)
.groupBy(0, 1).sum(2)
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
[2/4] flink git commit: [FLINK-1682] Ported optimizer unit tests from
Record API to Java API
Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
index 920b713..fe0a533 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
@@ -29,14 +29,13 @@ import java.util.Set;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.api.common.operators.base.DeltaIterationBase;
import org.apache.flink.util.Visitor;
/**
* Utility to get operator instances from plans via name.
*/
-@SuppressWarnings("deprecation")
public class OperatorResolver implements Visitor<Operator<?>> {
private final Map<String, List<Operator<?>>> map;
@@ -109,11 +108,11 @@ public class OperatorResolver implements Visitor<Operator<?>> {
list.add(visitable);
// recurse into bulk iterations
- if (visitable instanceof BulkIteration) {
- ((BulkIteration) visitable).getNextPartialSolution().accept(this);
- } else if (visitable instanceof DeltaIteration) {
- ((DeltaIteration) visitable).getSolutionSetDelta().accept(this);
- ((DeltaIteration) visitable).getNextWorkset().accept(this);
+ if (visitable instanceof BulkIterationBase) {
+ ((BulkIterationBase) visitable).getNextPartialSolution().accept(this);
+ } else if (visitable instanceof DeltaIterationBase) {
+ ((DeltaIterationBase) visitable).getSolutionSetDelta().accept(this);
+ ((DeltaIterationBase) visitable).getNextWorkset().accept(this);
}
return true;