You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:45 UTC
[06/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
new file mode 100644
index 0000000..e65758f
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class NestedIterationsTest extends CompilerTestBase {
+
+ @Test
+ public void testRejectNestedBulkIterations() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> data = env.generateSequence(1, 100);
+
+ IterativeDataSet<Long> outerIteration = data.iterate(100);
+
+ IterativeDataSet<Long> innerIteration = outerIteration.map(new IdentityMapper<Long>()).iterate(100);
+
+ DataSet<Long> innerResult = innerIteration.closeWith(innerIteration.map(new IdentityMapper<Long>()));
+
+ DataSet<Long> outerResult = outerIteration.closeWith(innerResult.map(new IdentityMapper<Long>()));
+
+ outerResult.print();
+
+ Plan p = env.createProgramPlan();
+
+ try {
+ compileNoStats(p);
+ }
+ catch (CompilerException e) {
+ assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRejectNestedWorksetIterations() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> outerIteration = data.iterateDelta(data, 100, 0);
+
+ DataSet<Tuple2<Long, Long>> inOuter = outerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> innerIteration = inOuter.iterateDelta(inOuter, 100, 0);
+
+ DataSet<Tuple2<Long, Long>> inInner = innerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> innerResult = innerIteration.closeWith(inInner, inInner).map(new IdentityMapper<Tuple2<Long,Long>>());
+
+ DataSet<Tuple2<Long, Long>> outerResult = outerIteration.closeWith(innerResult, innerResult);
+
+ outerResult.print();
+
+ Plan p = env.createProgramPlan();
+
+ try {
+ compileNoStats(p);
+ }
+ catch (CompilerException e) {
+ assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBulkIterationInClosure() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> data1 = env.generateSequence(1, 100);
+ DataSet<Long> data2 = env.generateSequence(1, 100);
+
+ IterativeDataSet<Long> firstIteration = data1.iterate(100);
+
+ DataSet<Long> firstResult = firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
+
+
+ IterativeDataSet<Long> mainIteration = data2.map(new IdentityMapper<Long>()).iterate(100);
+
+ DataSet<Long> joined = mainIteration.join(firstResult)
+ .where(new IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
+ .with(new DummyFlatJoinFunction<Long>());
+
+ DataSet<Long> mainResult = mainIteration.closeWith(joined);
+
+ mainResult.print();
+
+ Plan p = env.createProgramPlan();
+
+ // optimizer should be able to translate this
+ OptimizedPlan op = compileNoStats(p);
+
+ // job graph generator should be able to translate this
+ new JobGraphGenerator().compileJobGraph(op);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDeltaIterationInClosure() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> data1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+ DataSet<Tuple2<Long, Long>> data2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> firstIteration = data1.iterateDelta(data1, 100, 0);
+
+ DataSet<Tuple2<Long, Long>> inFirst = firstIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> firstResult = firstIteration.closeWith(inFirst, inFirst).map(new IdentityMapper<Tuple2<Long,Long>>());
+
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> mainIteration = data2.iterateDelta(data2, 100, 0);
+
+ DataSet<Tuple2<Long, Long>> joined = mainIteration.getWorkset().join(firstResult).where(0).equalTo(0)
+ .projectFirst(0).projectSecond(0);
+
+ DataSet<Tuple2<Long, Long>> mainResult = mainIteration.closeWith(joined, joined);
+
+ mainResult.print();
+
+ Plan p = env.createProgramPlan();
+
+ // optimizer should be able to translate this
+ OptimizedPlan op = compileNoStats(p);
+
+ // job graph generator should be able to translate this
+ new JobGraphGenerator().compileJobGraph(op);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
new file mode 100644
index 0000000..2b42f85
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PartitionPushdownTest extends CompilerTestBase {
+
+ @Test
+ public void testPartitioningNotPushedDown() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+
+ input
+ .groupBy(0, 1).sum(2)
+ .groupBy(0).sum(1)
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+
+ SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) sink.getInput().getSource();
+ SingleInputPlanNode agg2Combiner = (SingleInputPlanNode) agg2Reducer.getInput().getSource();
+ SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) agg2Combiner.getInput().getSource();
+
+ assertEquals(ShipStrategyType.PARTITION_HASH, agg2Reducer.getInput().getShipStrategy());
+ assertEquals(new FieldList(0), agg2Reducer.getInput().getShipStrategyKeys());
+
+ assertEquals(ShipStrategyType.FORWARD, agg2Combiner.getInput().getShipStrategy());
+
+ assertEquals(ShipStrategyType.PARTITION_HASH, agg1Reducer.getInput().getShipStrategy());
+ assertEquals(new FieldList(0, 1), agg1Reducer.getInput().getShipStrategyKeys());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPartitioningReused() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+
+ input
+ .groupBy(0).sum(1)
+ .groupBy(0, 1).sum(2)
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+
+ SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) sink.getInput().getSource();
+ SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) agg2Reducer.getInput().getSource();
+
+ assertEquals(ShipStrategyType.FORWARD, agg2Reducer.getInput().getShipStrategy());
+
+ assertEquals(ShipStrategyType.PARTITION_HASH, agg1Reducer.getInput().getShipStrategy());
+ assertEquals(new FieldList(0), agg1Reducer.getInput().getShipStrategyKeys());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
new file mode 100644
index 0000000..16684dc
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
@@ -0,0 +1,845 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class PartitioningReusageTest extends CompilerTestBase {
+
+ @Test
+ public void noPreviousPartitioningJoin1() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,1).equalTo(0,1).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+
+ }
+
+ @Test
+ public void noPreviousPartitioningJoin2() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,1).equalTo(2,1).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+
+ }
+
+ @Test
+ public void reuseSinglePartitioningJoin1() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .partitionByHash(0,1)
+ .map(new MockMapper()).withForwardedFields("0;1")
+ .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,1).equalTo(0,1).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+
+ }
+
+ @Test
+ public void reuseSinglePartitioningJoin2() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .partitionByHash(0,1)
+ .map(new MockMapper()).withForwardedFields("0;1")
+ .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,1).equalTo(2,1).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+ }
+
+ @Test
+ public void reuseSinglePartitioningJoin3() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .join(set2.partitionByHash(2, 1)
+ .map(new MockMapper())
+ .withForwardedFields("2;1"),
+ JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,1).equalTo(2,1).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+ }
+
+ @Test
+ public void reuseSinglePartitioningJoin4() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .partitionByHash(0)
+ .map(new MockMapper()).withForwardedFields("0")
+ .join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,1).equalTo(2,1).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+ }
+
+ @Test
+ public void reuseSinglePartitioningJoin5() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .join(set2.partitionByHash(2)
+ .map(new MockMapper())
+ .withForwardedFields("2"),
+ JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,1).equalTo(2,1).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+ }
+
+ @Test
+ public void reuseBothPartitioningJoin1() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .partitionByHash(0,1)
+ .map(new MockMapper()).withForwardedFields("0;1")
+ .join(set2.partitionByHash(0,1)
+ .map(new MockMapper())
+ .withForwardedFields("0;1"),
+ JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,1).equalTo(0,1).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+ }
+
+
+ @Test
+ public void reuseBothPartitioningJoin2() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .partitionByHash(0,1)
+ .map(new MockMapper()).withForwardedFields("0;1")
+ .join(set2.partitionByHash(1,2)
+ .map(new MockMapper())
+ .withForwardedFields("1;2"),
+ JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,1).equalTo(2,1).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+ }
+
+ @Test
+ public void reuseBothPartitioningJoin3() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .partitionByHash(0)
+ .map(new MockMapper()).withForwardedFields("0")
+ .join(set2.partitionByHash(2,1)
+ .map(new MockMapper())
+ .withForwardedFields("2;1"),
+ JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,1).equalTo(2,1).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+ }
+
+ @Test
+ public void reuseBothPartitioningJoin4() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .partitionByHash(0,2)
+ .map(new MockMapper()).withForwardedFields("0;2")
+ .join(set2.partitionByHash(1)
+ .map(new MockMapper())
+ .withForwardedFields("1"),
+ JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,2).equalTo(2,1).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+ }
+
+ @Test
+ public void reuseBothPartitioningJoin5() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .partitionByHash(2)
+ .map(new MockMapper()).withForwardedFields("2")
+ .join(set2.partitionByHash(1)
+ .map(new MockMapper())
+ .withForwardedFields("1"),
+ JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,2).equalTo(2,1).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+ }
+
+ @Test
+ public void reuseBothPartitioningJoin6() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .partitionByHash(0)
+ .map(new MockMapper()).withForwardedFields("0")
+ .join(set2.partitionByHash(1)
+ .map(new MockMapper())
+ .withForwardedFields("1"),
+ JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,2).equalTo(1,2).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+ }
+
+ @Test
+ public void reuseBothPartitioningJoin7() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+ .partitionByHash(2)
+ .map(new MockMapper()).withForwardedFields("2")
+ .join(set2.partitionByHash(1)
+ .map(new MockMapper())
+ .withForwardedFields("1"),
+ JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+ .where(0,2).equalTo(1,2).with(new MockJoin());
+
+ joined.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidJoinInputProperties(join);
+ }
+
+
+ @Test
+ public void noPreviousPartitioningCoGroup1() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .coGroup(set2)
+ .where(0,1).equalTo(0,1).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+
+ }
+
+ @Test
+ public void noPreviousPartitioningCoGroup2() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .coGroup(set2)
+ .where(0,1).equalTo(2,1).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+
+ }
+
+ @Test
+ public void reuseSinglePartitioningCoGroup1() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .partitionByHash(0,1)
+ .map(new MockMapper()).withForwardedFields("0;1")
+ .coGroup(set2)
+ .where(0,1).equalTo(0,1).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+
+ }
+
+ @Test
+ public void reuseSinglePartitioningCoGroup2() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .partitionByHash(0,1)
+ .map(new MockMapper()).withForwardedFields("0;1")
+ .coGroup(set2)
+ .where(0,1).equalTo(2,1).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+ }
+
+ @Test
+ public void reuseSinglePartitioningCoGroup3() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .coGroup(set2.partitionByHash(2, 1)
+ .map(new MockMapper())
+ .withForwardedFields("2;1"))
+ .where(0,1).equalTo(2, 1).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+ }
+
+ @Test
+ public void reuseSinglePartitioningCoGroup4() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .partitionByHash(0)
+ .map(new MockMapper()).withForwardedFields("0")
+ .coGroup(set2)
+ .where(0, 1).equalTo(2, 1).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+ }
+
+ @Test
+ public void reuseSinglePartitioningCoGroup5() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .coGroup(set2.partitionByHash(2)
+ .map(new MockMapper())
+ .withForwardedFields("2"))
+ .where(0,1).equalTo(2,1).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+ }
+
+ @Test
+ public void reuseBothPartitioningCoGroup1() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .partitionByHash(0,1)
+ .map(new MockMapper()).withForwardedFields("0;1")
+ .coGroup(set2.partitionByHash(0, 1)
+ .map(new MockMapper())
+ .withForwardedFields("0;1"))
+ .where(0, 1).equalTo(0, 1).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+ }
+
+
+ @Test
+ public void reuseBothPartitioningCoGroup2() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .partitionByHash(0,1)
+ .map(new MockMapper()).withForwardedFields("0;1")
+ .coGroup(set2.partitionByHash(1, 2)
+ .map(new MockMapper())
+ .withForwardedFields("1;2"))
+ .where(0, 1).equalTo(2, 1).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+ }
+
+ @Test
+ public void reuseBothPartitioningCoGroup3() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .partitionByHash(0)
+ .map(new MockMapper()).withForwardedFields("0")
+ .coGroup(set2.partitionByHash(2, 1)
+ .map(new MockMapper())
+ .withForwardedFields("2;1"))
+ .where(0, 1).equalTo(2, 1).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+ }
+
+ @Test
+ public void reuseBothPartitioningCoGroup4() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .partitionByHash(0,2)
+ .map(new MockMapper()).withForwardedFields("0;2")
+ .coGroup(set2.partitionByHash(1)
+ .map(new MockMapper())
+ .withForwardedFields("1"))
+ .where(0, 2).equalTo(2, 1).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+ }
+
+ @Test
+ public void reuseBothPartitioningCoGroup5() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .partitionByHash(2)
+ .map(new MockMapper()).withForwardedFields("2")
+ .coGroup(set2.partitionByHash(1)
+ .map(new MockMapper())
+ .withForwardedFields("1"))
+ .where(0, 2).equalTo(2, 1).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+ }
+
+ @Test
+ public void reuseBothPartitioningCoGroup6() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .partitionByHash(2)
+ .map(new MockMapper()).withForwardedFields("2")
+ .coGroup(set2.partitionByHash(2)
+ .map(new MockMapper())
+ .withForwardedFields("2"))
+ .where(0, 2).equalTo(1, 2).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+ }
+
+ @Test
+ public void reuseBothPartitioningCoGroup7() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+ .partitionByHash(2)
+ .map(new MockMapper()).withForwardedFields("2")
+ .coGroup(set2.partitionByHash(1)
+ .map(new MockMapper())
+ .withForwardedFields("1"))
+ .where(0, 2).equalTo(1, 2).with(new MockCoGroup());
+
+ coGrouped.print();
+ JavaPlan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+ checkValidCoGroupInputProperties(coGroup);
+ }
+
+
+
+ private void checkValidJoinInputProperties(DualInputPlanNode join) {
+
+ GlobalProperties inProps1 = join.getInput1().getGlobalProperties();
+ GlobalProperties inProps2 = join.getInput2().getGlobalProperties();
+
+ if(inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
+ inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
+
+ // check that both inputs are hash partitioned on the same fields
+ FieldList pFields1 = inProps1.getPartitioningFields();
+ FieldList pFields2 = inProps2.getPartitioningFields();
+
+ assertTrue("Inputs are not the same number of fields. Input 1: "+pFields1+", Input 2: "+pFields2,
+ pFields1.size() == pFields2.size());
+
+ FieldList reqPFields1 = join.getKeysForInput1();
+ FieldList reqPFields2 = join.getKeysForInput2();
+
+ for(int i=0; i<pFields1.size(); i++) {
+
+ // get fields
+ int f1 = pFields1.get(i);
+ int f2 = pFields2.get(i);
+
+ // check that field positions in original key field list are identical
+ int pos1 = getPosInFieldList(f1, reqPFields1);
+ int pos2 = getPosInFieldList(f2, reqPFields2);
+
+ if(pos1 < 0) {
+ fail("Input 1 is partitioned on field "+f1+" which is not contained in the key set "+reqPFields1);
+ }
+ if(pos2 < 0) {
+ fail("Input 2 is partitioned on field "+f2+" which is not contained in the key set "+reqPFields2);
+ }
+ if(pos1 != pos2) {
+ fail("Inputs are not partitioned on the same key fields");
+ }
+ }
+
+ }
+ else if(inProps1.getPartitioning() == PartitioningProperty.FULL_REPLICATION &&
+ inProps2.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) {
+ // we are good. No need to check for fields
+ }
+ else if(inProps1.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED &&
+ inProps2.getPartitioning() == PartitioningProperty.FULL_REPLICATION) {
+ // we are good. No need to check for fields
+ }
+ else {
+ throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroupinputs");
+ }
+
+ }
+
+ private void checkValidCoGroupInputProperties(DualInputPlanNode coGroup) {
+
+ GlobalProperties inProps1 = coGroup.getInput1().getGlobalProperties();
+ GlobalProperties inProps2 = coGroup.getInput2().getGlobalProperties();
+
+ if(inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
+ inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
+
+ // check that both inputs are hash partitioned on the same fields
+ FieldList pFields1 = inProps1.getPartitioningFields();
+ FieldList pFields2 = inProps2.getPartitioningFields();
+
+ assertTrue("Inputs are not the same number of fields. Input 1: "+pFields1+", Input 2: "+pFields2,
+ pFields1.size() == pFields2.size());
+
+ FieldList reqPFields1 = coGroup.getKeysForInput1();
+ FieldList reqPFields2 = coGroup.getKeysForInput2();
+
+ for(int i=0; i<pFields1.size(); i++) {
+
+ // get fields
+ int f1 = pFields1.get(i);
+ int f2 = pFields2.get(i);
+
+ // check that field positions in original key field list are identical
+ int pos1 = getPosInFieldList(f1, reqPFields1);
+ int pos2 = getPosInFieldList(f2, reqPFields2);
+
+ if(pos1 < 0) {
+ fail("Input 1 is partitioned on field "+f1+" which is not contained in the key set "+reqPFields1);
+ }
+ if(pos2 < 0) {
+ fail("Input 2 is partitioned on field "+f2+" which is not contained in the key set "+reqPFields2);
+ }
+ if(pos1 != pos2) {
+ fail("Inputs are not partitioned on the same key fields");
+ }
+ }
+
+ }
+ else {
+ throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroup inputs");
+ }
+
+ }
+
+ private int getPosInFieldList(int field, FieldList list) {
+
+ int pos;
+ for(pos=0; pos<list.size(); pos++) {
+ if(field == list.get(pos)) {
+ break;
+ }
+ }
+ if(pos == list.size()) {
+ return -1;
+ } else {
+ return pos;
+ }
+
+ }
+
+
+
+ public static class MockMapper implements MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+ @Override
+ public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> value) throws Exception {
+ return null;
+ }
+ }
+
+ public static class MockJoin implements JoinFunction<Tuple3<Integer, Integer, Integer>,
+ Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+
+ @Override
+ public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws Exception {
+ return null;
+ }
+ }
+
+ public static class MockCoGroup implements CoGroupFunction<Tuple3<Integer, Integer, Integer>,
+ Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+
+ @Override
+ public void coGroup(Iterable<Tuple3<Integer, Integer, Integer>> first, Iterable<Tuple3<Integer, Integer, Integer>> second,
+ Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
+
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
new file mode 100644
index 0000000..86f01b0
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.configuration.Configuration;
+
+@SuppressWarnings("serial")
+public class PipelineBreakerTest extends CompilerTestBase {
+
+ @Test
+ public void testPipelineBreakerWithBroadcastVariable() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(64);
+
+ DataSet<Long> source = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+
+ DataSet<Long> result = source.map(new IdentityMapper<Long>())
+ .map(new IdentityMapper<Long>())
+ .withBroadcastSet(source, "bc");
+
+ result.print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+
+ assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPipelineBreakerBroadcastedAllReduce() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(64);
+
+ DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+
+ DataSet<Long> bcInput1 = sourceWithMapper
+ .map(new IdentityMapper<Long>())
+ .reduce(new SelectOneReducer<Long>());
+ DataSet<Long> bcInput2 = env.generateSequence(1, 10);
+
+ DataSet<Long> result = sourceWithMapper
+ .map(new IdentityMapper<Long>())
+ .withBroadcastSet(bcInput1, "bc1")
+ .withBroadcastSet(bcInput2, "bc2");
+
+ result.print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+
+ assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPipelineBreakerBroadcastedPartialSolution() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(64);
+
+
+ DataSet<Long> initialSource = env.generateSequence(1, 10);
+ IterativeDataSet<Long> iteration = initialSource.iterate(100);
+
+
+ DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+
+ DataSet<Long> bcInput1 = sourceWithMapper
+ .map(new IdentityMapper<Long>())
+ .reduce(new SelectOneReducer<Long>());
+
+ DataSet<Long> result = sourceWithMapper
+ .map(new IdentityMapper<Long>())
+ .withBroadcastSet(iteration, "bc2")
+ .withBroadcastSet(bcInput1, "bc1");
+
+
+ iteration.closeWith(result).print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ BulkIterationPlanNode iterationPlanNode = (BulkIterationPlanNode) sink.getInput().getSource();
+ SingleInputPlanNode mapper = (SingleInputPlanNode) iterationPlanNode.getRootOfStepFunction();
+
+ assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPilelineBreakerWithCross() {
+ try {
+ {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(64);
+
+ DataSet<Long> initialSource = env.generateSequence(1, 10);
+
+ Configuration conf= new Configuration();
+ conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
+ initialSource
+ .map(new IdentityMapper<Long>())
+ .cross(initialSource).withParameters(conf)
+ .print();
+
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+
+ assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
+ }
+
+ {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(64);
+
+ DataSet<Long> initialSource = env.generateSequence(1, 10);
+
+ Configuration conf= new Configuration();
+ conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
+ initialSource
+ .map(new IdentityMapper<Long>())
+ .cross(initialSource).withParameters(conf)
+ .print();
+
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+
+ assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
+ }
+
+ {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(64);
+
+ DataSet<Long> initialSource = env.generateSequence(1, 10);
+
+ Configuration conf= new Configuration();
+ conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
+ initialSource
+ .map(new IdentityMapper<Long>())
+ .cross(initialSource).withParameters(conf)
+ .print();
+
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+
+ assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
+ }
+
+ {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(64);
+
+ DataSet<Long> initialSource = env.generateSequence(1, 10);
+
+ Configuration conf= new Configuration();
+ conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
+ initialSource
+ .map(new IdentityMapper<Long>())
+ .cross(initialSource).withParameters(conf)
+ .print();
+
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+
+ assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
new file mode 100644
index 0000000..7be2b16
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
@@ -0,0 +1,897 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@SuppressWarnings({"serial"})
+public class PropertyDataSourceTest extends CompilerTestBase {
+
+ private List<Tuple3<Long, SomePojo, String>> tuple3PojoData = new ArrayList<Tuple3<Long, SomePojo, String>>();
+ private TupleTypeInfo<Tuple3<Long, SomePojo, String>> tuple3PojoType = new TupleTypeInfo<Tuple3<Long, SomePojo, String>>(
+ BasicTypeInfo.LONG_TYPE_INFO,
+ TypeExtractor.createTypeInfo(SomePojo.class),
+ BasicTypeInfo.STRING_TYPE_INFO
+ );
+
+ @Test
+ public void checkSinglePartitionedSource1() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple2<Long, String>> data =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy(0);
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(lprops.getGroupedFields() == null);
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedSource2() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple2<Long, String>> data =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy(1, 0);
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(lprops.getGroupedFields() == null);
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedSource3() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy("*");
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1, 2, 3, 4)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(lprops.getGroupedFields() == null);
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedSource4() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy("f1");
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1, 2, 3)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(lprops.getGroupedFields() == null);
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedSource5() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy("f1.stringField");
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(3)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(lprops.getGroupedFields() == null);
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedSource6() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy("f1.intField; f2");
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2, 4)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(lprops.getGroupedFields() == null);
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedSource7() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple2<Long, String>> data =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy("byDate", 1, 0);
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+ Assert.assertTrue(gprops.getCustomPartitioner() != null);
+ Assert.assertTrue(lprops.getGroupedFields() == null);
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedGroupedSource1() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple2<Long, String>> data =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy(0)
+ .splitsGroupedBy(0);
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0)));
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedGroupedSource2() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple2<Long, String>> data =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy(0)
+ .splitsGroupedBy(1, 0);
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0, 1)));
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+
+ @Test
+ public void checkSinglePartitionedGroupedSource3() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple2<Long, String>> data =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy(1)
+ .splitsGroupedBy(0);
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(lprops.getGroupedFields() == null);
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedGroupedSource4() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple2<Long, String>> data =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy(0, 1)
+ .splitsGroupedBy(0);
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(lprops.getGroupedFields() == null);
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedGroupedSource5() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy("f2")
+ .splitsGroupedBy("f2");
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(4)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(4)));
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+
+ @Test
+ public void checkSinglePartitionedGroupedSource6() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy("f1.intField")
+ .splitsGroupedBy("f0; f1.intField");
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2)));
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+
+ @Test
+ public void checkSinglePartitionedGroupedSource7() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy("f1.intField")
+ .splitsGroupedBy("f1");
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3)));
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedGroupedSource8() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy("f1")
+ .splitsGroupedBy("f1.stringField");
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1,2,3)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(lprops.getGroupedFields() == null);
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+
+ @Test
+ public void checkSinglePartitionedOrderedSource1() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple2<Long, String>> data =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy(1)
+ .splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue((new FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1)));
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedOrderedSource2() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple2<Long, String>> data =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy(1)
+ .splitsOrderedBy(new int[]{1, 0}, new Order[]{Order.ASCENDING, Order.DESCENDING});
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue((new FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1, 0)));
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+
+ @Test
+ public void checkSinglePartitionedOrderedSource3() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple2<Long, String>> data =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy(0)
+ .splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(lprops.getGroupedFields() == null);
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedOrderedSource4() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple2<Long, String>> data =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy(0, 1)
+ .splitsOrderedBy(new int[]{1}, new Order[]{Order.DESCENDING});
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(lprops.getGroupedFields() == null);
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedOrderedSource5() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy("f1.intField")
+ .splitsOrderedBy("f0; f1.intField", new Order[]{Order.ASCENDING, Order.DESCENDING});
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2)));
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+
+ @Test
+ public void checkSinglePartitionedOrderedSource6() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy("f1.intField")
+ .splitsOrderedBy("f1", new Order[]{Order.DESCENDING});
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3)));
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+ @Test
+ public void checkSinglePartitionedOrderedSource7() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+ data.getSplitDataProperties()
+ .splitsPartitionedBy("f1")
+ .splitsOrderedBy("f1.stringField", new Order[]{Order.ASCENDING});
+
+ data.print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor();
+
+ GlobalProperties gprops = sourceNode.getGlobalProperties();
+ LocalProperties lprops = sourceNode.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1,2,3)));
+ Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING);
+ Assert.assertTrue(lprops.getGroupedFields() == null);
+ Assert.assertTrue(lprops.getOrdering() == null);
+
+ }
+
+
+ @Test
+ public void checkCoPartitionedSources1() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple2<Long, String>> data1 =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data1.getSplitDataProperties()
+ .splitsPartitionedBy("byDate", 0);
+
+ DataSource<Tuple2<Long, String>> data2 =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data2.getSplitDataProperties()
+ .splitsPartitionedBy("byDate", 0);
+
+ data1.union(data2).print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode1 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource();
+ SourcePlanNode sourceNode2 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource();
+
+ GlobalProperties gprops1 = sourceNode1.getGlobalProperties();
+ LocalProperties lprops1 = sourceNode1.getLocalProperties();
+ GlobalProperties gprops2 = sourceNode2.getGlobalProperties();
+ LocalProperties lprops2 = sourceNode2.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+ Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+ Assert.assertTrue(lprops1.getGroupedFields() == null);
+ Assert.assertTrue(lprops1.getOrdering() == null);
+
+ Assert.assertTrue((new FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+ Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+ Assert.assertTrue(lprops2.getGroupedFields() == null);
+ Assert.assertTrue(lprops2.getOrdering() == null);
+
+ Assert.assertTrue(gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner()));
+ }
+
+ @Test
+ public void checkCoPartitionedSources2() {
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSource<Tuple2<Long, String>> data1 =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data1.getSplitDataProperties()
+ .splitsPartitionedBy("byCountry", 0);
+
+ DataSource<Tuple2<Long, String>> data2 =
+ env.readCsvFile("/some/path").types(Long.class, String.class);
+
+ data2.getSplitDataProperties()
+ .splitsPartitionedBy("byDate", 0);
+
+ data1.union(data2).print();
+
+ JavaPlan plan = env.createProgramPlan();
+
+ // submit the plan to the compiler
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // check the optimized Plan
+ SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+ SourcePlanNode sourceNode1 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource();
+ SourcePlanNode sourceNode2 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource();
+
+ GlobalProperties gprops1 = sourceNode1.getGlobalProperties();
+ LocalProperties lprops1 = sourceNode1.getLocalProperties();
+ GlobalProperties gprops2 = sourceNode2.getGlobalProperties();
+ LocalProperties lprops2 = sourceNode2.getLocalProperties();
+
+ Assert.assertTrue((new FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+ Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+ Assert.assertTrue(lprops1.getGroupedFields() == null);
+ Assert.assertTrue(lprops1.getOrdering() == null);
+
+ Assert.assertTrue((new FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+ Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING);
+ Assert.assertTrue(lprops2.getGroupedFields() == null);
+ Assert.assertTrue(lprops2.getOrdering() == null);
+
+ Assert.assertTrue(!gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner()));
+ }
+
+
+ public static class SomePojo {
+ public double doubleField;
+ public int intField;
+ public String stringField;
+ }
+
+}
+
+