You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:41 UTC
[02/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java
new file mode 100644
index 0000000..9f2c467
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalPropertiesFilteringTest.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+
+public class RequestedGlobalPropertiesFilteringTest {
+
+ private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
+ new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ @Test(expected = NullPointerException.class)
+ public void testNullProps() {
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setAnyPartitioning(new FieldSet(0,1,2));
+
+ rgProps.filterBySemanticProperties(null, 0);
+ }
+
+ @Test
+ public void testEraseAll1() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setAnyPartitioning(new FieldSet(0,1,2));
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNull(filtered);
+ }
+
+ @Test
+ public void testEraseAll2() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"3;4"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setAnyPartitioning(new FieldSet(0, 1, 2));
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNull(filtered);
+ }
+
+ @Test
+ public void testHashPartitioningPreserved1() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNotNull(filtered);
+ assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered.getPartitioning());
+ assertNotNull(filtered.getPartitionedFields());
+ assertEquals(3, filtered.getPartitionedFields().size());
+ assertTrue(filtered.getPartitionedFields().contains(0));
+ assertTrue(filtered.getPartitionedFields().contains(3));
+ assertTrue(filtered.getPartitionedFields().contains(4));
+ assertNull(filtered.getDataDistribution());
+ assertNull(filtered.getCustomPartitioner());
+ assertNull(filtered.getOrdering());
+ }
+
+ @Test
+ public void testHashPartitioningPreserved2() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNotNull(filtered);
+ assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered.getPartitioning());
+ assertNotNull(filtered.getPartitionedFields());
+ assertEquals(3, filtered.getPartitionedFields().size());
+ assertTrue(filtered.getPartitionedFields().contains(1));
+ assertTrue(filtered.getPartitionedFields().contains(2));
+ assertTrue(filtered.getPartitionedFields().contains(7));
+ assertNull(filtered.getDataDistribution());
+ assertNull(filtered.getCustomPartitioner());
+ assertNull(filtered.getOrdering());
+ }
+
+ @Test
+ public void testHashPartitioningErased() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setHashPartitioned(new FieldSet(0, 3, 4));
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNull(filtered);
+ }
+
+ @Test
+ public void testAnyPartitioningPreserved1() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;3;4"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNotNull(filtered);
+ assertEquals(PartitioningProperty.ANY_PARTITIONING, filtered.getPartitioning());
+ assertNotNull(filtered.getPartitionedFields());
+ assertEquals(3, filtered.getPartitionedFields().size());
+ assertTrue(filtered.getPartitionedFields().contains(0));
+ assertTrue(filtered.getPartitionedFields().contains(3));
+ assertTrue(filtered.getPartitionedFields().contains(4));
+ assertNull(filtered.getDataDistribution());
+ assertNull(filtered.getCustomPartitioner());
+ assertNull(filtered.getOrdering());
+ }
+
+ @Test
+ public void testAnyPartitioningPreserved2() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"2->0;1->3;7->4"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNotNull(filtered);
+ assertEquals(PartitioningProperty.ANY_PARTITIONING, filtered.getPartitioning());
+ assertNotNull(filtered.getPartitionedFields());
+ assertEquals(3, filtered.getPartitionedFields().size());
+ assertTrue(filtered.getPartitionedFields().contains(1));
+ assertTrue(filtered.getPartitionedFields().contains(2));
+ assertTrue(filtered.getPartitionedFields().contains(7));
+ assertNull(filtered.getDataDistribution());
+ assertNull(filtered.getCustomPartitioner());
+ assertNull(filtered.getOrdering());
+ }
+
+ @Test
+ public void testAnyPartitioningErased() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setAnyPartitioning(new FieldSet(0, 3, 4));
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNull(filtered);
+ }
+
+ @Test
+ public void testRangePartitioningPreserved1() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;3;6"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+ o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setRangePartitioned(o);
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNotNull(filtered);
+ assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning());
+ assertNotNull(filtered.getOrdering());
+ assertEquals(3, filtered.getOrdering().getNumberOfFields());
+ assertEquals(3, filtered.getOrdering().getFieldNumber(0).intValue());
+ assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
+ assertEquals(6, filtered.getOrdering().getFieldNumber(2).intValue());
+ assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+ assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+ assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+ assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+ assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+ assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+ assertNull(filtered.getPartitionedFields());
+ assertNull(filtered.getDataDistribution());
+ assertNull(filtered.getCustomPartitioner());
+ }
+
+ @Test
+ public void testRangePartitioningPreserved2() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+ o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setRangePartitioned(o);
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNotNull(filtered);
+ assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning());
+ assertNotNull(filtered.getOrdering());
+ assertEquals(3, filtered.getOrdering().getNumberOfFields());
+ assertEquals(7, filtered.getOrdering().getFieldNumber(0).intValue());
+ assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
+ assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue());
+ assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+ assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+ assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+ assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+ assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+ assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+ assertNull(filtered.getPartitionedFields());
+ assertNull(filtered.getDataDistribution());
+ assertNull(filtered.getCustomPartitioner());
+ }
+
+ @Test
+ public void testRangePartitioningPreserved3() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"7->3;1->1;2->6"}, null, null, tupleInfo, tupleInfo);
+
+ DataDistribution dd = new MockDistribution();
+ Ordering o = new Ordering();
+ o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+ o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setRangePartitioned(o, dd);
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNotNull(filtered);
+ assertEquals(PartitioningProperty.RANGE_PARTITIONED, filtered.getPartitioning());
+ assertNotNull(filtered.getOrdering());
+ assertEquals(3, filtered.getOrdering().getNumberOfFields());
+ assertEquals(7, filtered.getOrdering().getFieldNumber(0).intValue());
+ assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
+ assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue());
+ assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+ assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+ assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+ assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+ assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+ assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+ assertNotNull(filtered.getDataDistribution());
+ assertEquals(dd, filtered.getDataDistribution());
+ assertNull(filtered.getPartitionedFields());
+ assertNull(filtered.getCustomPartitioner());
+ }
+
+ @Test
+ public void testRangePartitioningErased() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"1;2"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(3, LongValue.class, Order.DESCENDING);
+ o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setRangePartitioned(o);
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNull(filtered);
+ }
+
+ @Test
+ public void testCustomPartitioningErased() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setCustomPartitioned(new FieldSet(0, 1, 2), new MockPartitioner());
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNull(filtered);
+ }
+
+ @Test
+ public void testRandomDistributionErased() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setRandomPartitioning();
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNull(filtered);
+ }
+
+ @Test
+ public void testReplicationErased() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setFullyReplicated();
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNull(filtered);
+ }
+
+ @Test
+ public void testRebalancingErased() {
+
+ SingleInputSemanticProperties sProp = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
+ rgProps.setForceRebalancing();
+
+ RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
+
+ assertNull(filtered);
+ }
+
+ @Test
+ public void testDualHashPartitioningPreserved() {
+
+ DualInputSemanticProperties dprops = new DualInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsDualFromString(dprops, new String[]{"0;2;4"}, new String[]{"1->3;4->6;3->7"},
+ null, null, null, null, tupleInfo, tupleInfo, tupleInfo);
+
+ RequestedGlobalProperties gprops1 = new RequestedGlobalProperties();
+ RequestedGlobalProperties gprops2 = new RequestedGlobalProperties();
+ gprops1.setHashPartitioned(new FieldSet(2, 0, 4));
+ gprops2.setHashPartitioned(new FieldSet(3, 6, 7));
+ RequestedGlobalProperties filtered1 = gprops1.filterBySemanticProperties(dprops, 0);
+ RequestedGlobalProperties filtered2 = gprops2.filterBySemanticProperties(dprops, 1);
+
+ assertNotNull(filtered1);
+ assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered1.getPartitioning());
+ assertNotNull(filtered1.getPartitionedFields());
+ assertEquals(3, filtered1.getPartitionedFields().size());
+ assertTrue(filtered1.getPartitionedFields().contains(0));
+ assertTrue(filtered1.getPartitionedFields().contains(2));
+ assertTrue(filtered1.getPartitionedFields().contains(4));
+ assertNull(filtered1.getOrdering());
+ assertNull(filtered1.getCustomPartitioner());
+ assertNull(filtered1.getDataDistribution());
+
+ assertNotNull(filtered2);
+ assertEquals(PartitioningProperty.HASH_PARTITIONED, filtered2.getPartitioning());
+ assertNotNull(filtered2.getPartitionedFields());
+ assertEquals(3, filtered2.getPartitionedFields().size());
+ assertTrue(filtered2.getPartitionedFields().contains(1));
+ assertTrue(filtered2.getPartitionedFields().contains(3));
+ assertTrue(filtered2.getPartitionedFields().contains(4));
+ assertNull(filtered2.getOrdering());
+ assertNull(filtered2.getCustomPartitioner());
+ assertNull(filtered2.getDataDistribution());
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testInvalidInputIndex() {
+ SingleInputSemanticProperties sprops = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sprops, new String[]{"0;1"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedGlobalProperties gprops = new RequestedGlobalProperties();
+ gprops.setHashPartitioned(new FieldSet(0,1));
+
+ gprops.filterBySemanticProperties(sprops, 1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java
new file mode 100644
index 0000000..0cede0e
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/RequestedLocalPropertiesFilteringTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+public class RequestedLocalPropertiesFilteringTest {
+
+ private TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>> tupleInfo =
+ new TupleTypeInfo<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ @Test(expected = NullPointerException.class)
+ public void testNullProps() {
+
+ RequestedLocalProperties rlProp = new RequestedLocalProperties();
+ rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+ rlProp.filterBySemanticProperties(null, 0);
+ }
+
+ @Test
+ public void testAllErased() {
+
+ SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+
+ RequestedLocalProperties rlProp = new RequestedLocalProperties();
+ rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+ RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+ assertNull(filtered);
+ }
+
+ @Test
+ public void testGroupingPreserved1() {
+
+ SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"0;2;3"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedLocalProperties rlProp = new RequestedLocalProperties();
+ rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+ RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+ assertNotNull(filtered);
+ assertNotNull(filtered.getGroupedFields());
+ assertEquals(3, filtered.getGroupedFields().size());
+ assertTrue(filtered.getGroupedFields().contains(0));
+ assertTrue(filtered.getGroupedFields().contains(2));
+ assertTrue(filtered.getGroupedFields().contains(3));
+ assertNull(filtered.getOrdering());
+ }
+
+ @Test
+ public void testGroupingPreserved2() {
+
+ SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"3->0;5->2;1->3"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedLocalProperties rlProp = new RequestedLocalProperties();
+ rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+ RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+ assertNotNull(filtered);
+ assertNotNull(filtered.getGroupedFields());
+ assertEquals(3, filtered.getGroupedFields().size());
+ assertTrue(filtered.getGroupedFields().contains(3));
+ assertTrue(filtered.getGroupedFields().contains(5));
+ assertTrue(filtered.getGroupedFields().contains(1));
+ assertNull(filtered.getOrdering());
+ }
+
+ @Test
+ public void testGroupingErased() {
+
+ SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"0;2"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedLocalProperties rlProp = new RequestedLocalProperties();
+ rlProp.setGroupedFields(new FieldSet(0, 2, 3));
+
+ RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+ assertNull(filtered);
+ }
+
+ @Test
+ public void testOrderPreserved1() {
+
+ SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1;4;6"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+ o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+ RequestedLocalProperties rlProp = new RequestedLocalProperties();
+ rlProp.setOrdering(o);
+
+ RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+ assertNotNull(filtered);
+ assertNotNull(filtered.getOrdering());
+ assertEquals(3, filtered.getOrdering().getNumberOfFields());
+ assertEquals(4, filtered.getOrdering().getFieldNumber(0).intValue());
+ assertEquals(1, filtered.getOrdering().getFieldNumber(1).intValue());
+ assertEquals(6, filtered.getOrdering().getFieldNumber(2).intValue());
+ assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+ assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+ assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+ assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+ assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+ assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+ assertNull(filtered.getGroupedFields());
+ }
+
+ @Test
+ public void testOrderPreserved2() {
+
+ SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"5->1;0->4;2->6"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+ o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+ RequestedLocalProperties rlProp = new RequestedLocalProperties();
+ rlProp.setOrdering(o);
+
+ RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+ assertNotNull(filtered);
+ assertNotNull(filtered.getOrdering());
+ assertEquals(3, filtered.getOrdering().getNumberOfFields());
+ assertEquals(0, filtered.getOrdering().getFieldNumber(0).intValue());
+ assertEquals(5, filtered.getOrdering().getFieldNumber(1).intValue());
+ assertEquals(2, filtered.getOrdering().getFieldNumber(2).intValue());
+ assertEquals(LongValue.class, filtered.getOrdering().getType(0));
+ assertEquals(IntValue.class, filtered.getOrdering().getType(1));
+ assertEquals(ByteValue.class, filtered.getOrdering().getType(2));
+ assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(0));
+ assertEquals(Order.ASCENDING, filtered.getOrdering().getOrder(1));
+ assertEquals(Order.DESCENDING, filtered.getOrdering().getOrder(2));
+ assertNull(filtered.getGroupedFields());
+ }
+
+ @Test
+ public void testOrderErased() {
+
+ SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1; 4"}, null, null, tupleInfo, tupleInfo);
+
+ Ordering o = new Ordering();
+ o.appendOrdering(4, LongValue.class, Order.DESCENDING);
+ o.appendOrdering(1, IntValue.class, Order.ASCENDING);
+ o.appendOrdering(6, ByteValue.class, Order.DESCENDING);
+
+ RequestedLocalProperties rlProp = new RequestedLocalProperties();
+ rlProp.setOrdering(o);
+
+ RequestedLocalProperties filtered = rlProp.filterBySemanticProperties(sProps, 0);
+
+ assertNull(filtered);
+ }
+
+ @Test
+ public void testDualGroupingPreserved() {
+
+ DualInputSemanticProperties dprops = new DualInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsDualFromString(dprops, new String[]{"1->0;3;2->4"}, new String[]{"0->7;1"},
+ null, null, null, null, tupleInfo, tupleInfo, tupleInfo);
+
+ RequestedLocalProperties lprops1 = new RequestedLocalProperties();
+ lprops1.setGroupedFields(new FieldSet(0,3,4));
+
+ RequestedLocalProperties lprops2 = new RequestedLocalProperties();
+ lprops2.setGroupedFields(new FieldSet(7, 1));
+
+ RequestedLocalProperties filtered1 = lprops1.filterBySemanticProperties(dprops, 0);
+ RequestedLocalProperties filtered2 = lprops2.filterBySemanticProperties(dprops, 1);
+
+ assertNotNull(filtered1);
+ assertNotNull(filtered1.getGroupedFields());
+ assertEquals(3, filtered1.getGroupedFields().size());
+ assertTrue(filtered1.getGroupedFields().contains(1));
+ assertTrue(filtered1.getGroupedFields().contains(2));
+ assertTrue(filtered1.getGroupedFields().contains(3));
+ assertNull(filtered1.getOrdering());
+
+ assertNotNull(filtered2);
+ assertNotNull(filtered2.getGroupedFields());
+ assertEquals(2, filtered2.getGroupedFields().size());
+ assertTrue(filtered2.getGroupedFields().contains(0));
+ assertTrue(filtered2.getGroupedFields().contains(1));
+ assertNull(filtered2.getOrdering());
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testInvalidInputIndex() {
+
+ SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
+ SemanticPropUtil.getSemanticPropsSingleFromString(sProps, new String[]{"1; 4"}, null, null, tupleInfo, tupleInfo);
+
+ RequestedLocalProperties rlProp = new RequestedLocalProperties();
+ rlProp.setGroupedFields(new FieldSet(1, 4));
+
+ rlProp.filterBySemanticProperties(sProps, 1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
new file mode 100644
index 0000000..b359e6b
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.CompilerTestBase;
+
+
+@SuppressWarnings({"serial", "unchecked"})
+public class DeltaIterationDependenciesTest extends CompilerTestBase {
+
+ @Test
+ public void testExceptionWhenNewWorksetNotDependentOnWorkset() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> deltaIteration = input.iterateDelta(input, 10,0);
+
+ DataSet<Tuple2<Long, Long>> delta = deltaIteration.getSolutionSet().join(deltaIteration.getWorkset())
+ .where(0).equalTo(0)
+ .projectFirst(1).projectSecond(1);
+
+ DataSet<Tuple2<Long, Long>> nextWorkset = deltaIteration.getSolutionSet().join(input)
+ .where(0).equalTo(0)
+ .projectFirst(1).projectSecond(1);
+
+
+ DataSet<Tuple2<Long, Long>> result = deltaIteration.closeWith(delta, nextWorkset);
+
+ result.print();
+
+ Plan p = env.createProgramPlan();
+ try {
+ compileNoStats(p);
+ fail("Should not be able to compile, since the next workset does not depend on the workset");
+ }
+ catch (CompilerException e) {
+ // good
+ }
+ catch (Exception e) {
+ fail("wrong exception type");
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
new file mode 100644
index 0000000..de02836
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+@SuppressWarnings("serial")
+public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
+
+ @Test
+ public void testDistinctPreservesPartitioningOfDistinctFields() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(4);
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
+ .map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
+
+ data.distinct(0)
+ .groupBy(0)
+ .sum(1)
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+ SingleInputPlanNode distinctReducer = (SingleInputPlanNode) reducer.getInput().getSource();
+
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+
+ // reducer can be forward, reuses partitioning from distinct
+ assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
+
+ // distinct reducer is partitioned
+ assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDistinctDestroysPartitioningOfNonDistinctFields() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(4);
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<Long, Long>> data = env.fromElements(new Tuple2<Long, Long>(0L, 0L), new Tuple2<Long, Long>(1L, 1L))
+ .map(new IdentityMapper<Tuple2<Long,Long>>()).setParallelism(4);
+
+ data.distinct(1)
+ .groupBy(0)
+ .sum(1)
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+ SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
+ SingleInputPlanNode distinctReducer = (SingleInputPlanNode) combiner.getInput().getSource();
+
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+
+ // reducer must repartition, because it works on a different field
+ assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
+
+ assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+
+ // distinct reducer is partitioned
+ assertEquals(ShipStrategyType.PARTITION_HASH, distinctReducer.getInput().getShipStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
new file mode 100644
index 0000000..a683968
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class GroupReduceCompilationTest extends CompilerTestBase implements java.io.Serializable {
+
+ @Test
+ public void testAllGroupReduceNoCombiner() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
+
+ data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
+ public void reduce(Iterable<Double> values, Collector<Double> out) {}
+ }).name("reducer")
+ .print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+
+ // the all-reduce has no combiner, when the DOP of the input is one
+
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // check wiring
+ assertEquals(sourceNode, reduceNode.getInput().getSource());
+ assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+ // check that reduce has the right strategy
+ assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
+
+ // check DOP
+ assertEquals(1, sourceNode.getParallelism());
+ assertEquals(1, reduceNode.getParallelism());
+ assertEquals(1, sinkNode.getParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testAllReduceWithCombiner() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
+
+ GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
+ public void reduce(Iterable<Long> values, Collector<Long> out) {}
+ }).name("reducer");
+
+ reduced.setCombinable(true);
+ reduced.print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the combiner
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, combineNode.getInput().getSource());
+ assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, combineNode.getDriverStrategy());
+
+ // check DOP
+ assertEquals(8, sourceNode.getParallelism());
+ assertEquals(8, combineNode.getParallelism());
+ assertEquals(1, reduceNode.getParallelism());
+ assertEquals(1, sinkNode.getParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+
+
+ @Test
+ public void testGroupedReduceWithFieldPositionKeyNonCombinable() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ data
+ .groupBy(1)
+ .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+ public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+ }).name("reducer")
+ .print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // check wiring
+ assertEquals(sourceNode, reduceNode.getInput().getSource());
+ assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(1), reduceNode.getKeys(0));
+ assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check DOP
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGroupedReduceWithFieldPositionKeyCombinable() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
+ .groupBy(1)
+ .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+ public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+ }).name("reducer");
+
+ reduced.setCombinable(true);
+ reduced.print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the combiner
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, combineNode.getInput().getSource());
+ assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(1), reduceNode.getKeys(0));
+ assertEquals(new FieldList(1), combineNode.getKeys(0));
+ assertEquals(new FieldList(1), combineNode.getKeys(1));
+ assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check DOP
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGroupedReduceWithSelectorFunctionKeyNoncombinable() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ data
+ .groupBy(new KeySelector<Tuple2<String,Double>, String>() {
+ public String getKey(Tuple2<String, Double> value) { return value.f0; }
+ })
+ .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+ public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+ }).name("reducer")
+ .print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the key extractors and projectors
+ SingleInputPlanNode keyExtractor = (SingleInputPlanNode) reduceNode.getInput().getSource();
+ SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, keyExtractor.getInput().getSource());
+ assertEquals(keyProjector, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(0), reduceNode.getKeys(0));
+ assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check DOP
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, keyExtractor.getParallelism());
+
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, keyProjector.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGroupedReduceWithSelectorFunctionKeyCombinable() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
+ .groupBy(new KeySelector<Tuple2<String,Double>, String>() {
+ public String getKey(Tuple2<String, Double> value) { return value.f0; }
+ })
+ .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+ public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
+ }).name("reducer");
+
+ reduced.setCombinable(true);
+ reduced.print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the combiner
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+ // get the key extractors and projectors
+ SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
+ SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, keyExtractor.getInput().getSource());
+ assertEquals(keyProjector, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(0), reduceNode.getKeys(0));
+ assertEquals(new FieldList(0), combineNode.getKeys(0));
+ assertEquals(new FieldList(0), combineNode.getKeys(1));
+ assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check DOP
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, keyExtractor.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
+
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, keyProjector.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
new file mode 100644
index 0000000..37a8e81
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class IterationCompilerTest extends CompilerTestBase {
+
+ @Test
+ public void testIdentityIteration() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(43);
+
+ IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100);
+ iteration.closeWith(iteration).print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ new JobGraphGenerator().compileJobGraph(op);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testEmptyWorksetIteration() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(43);
+
+ DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20)
+ .map(new MapFunction<Long, Tuple2<Long, Long>>() {
+ @Override
+ public Tuple2<Long, Long> map(Long value){ return null; }
+ });
+
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = input.iterateDelta(input, 100, 0);
+ iter.closeWith(iter.getWorkset(), iter.getWorkset())
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ new JobGraphGenerator().compileJobGraph(op);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testIterationWithUnionRoot() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(43);
+
+ IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100);
+
+ iteration.closeWith(
+ iteration.map(new IdentityMapper<Long>()).union(iteration.map(new IdentityMapper<Long>())))
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ BulkIterationPlanNode iterNode = (BulkIterationPlanNode) sink.getInput().getSource();
+
+ // make sure that the root is part of the dynamic path
+
+ // the "NoOp" that comes after the union.
+ SingleInputPlanNode noop = (SingleInputPlanNode) iterNode.getRootOfStepFunction();
+ NAryUnionPlanNode union = (NAryUnionPlanNode) noop.getInput().getSource();
+
+ assertTrue(noop.isOnDynamicPath());
+ assertTrue(noop.getCostWeight() >= 1);
+
+ assertTrue(union.isOnDynamicPath());
+ assertTrue(union.getCostWeight() >= 1);
+
+ // see that the jobgraph generator can translate this
+ new JobGraphGenerator().compileJobGraph(op);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testWorksetIterationWithUnionRoot() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(43);
+
+ DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 20)
+ .map(new MapFunction<Long, Tuple2<Long, Long>>() {
+ @Override
+ public Tuple2<Long, Long> map(Long value){ return null; }
+ });
+
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = input.iterateDelta(input, 100, 0);
+ iter.closeWith(
+ iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>())
+ .union(
+ iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>()))
+ , iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>())
+ .union(
+ iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>()))
+ )
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ WorksetIterationPlanNode iterNode = (WorksetIterationPlanNode) sink.getInput().getSource();
+
+ // make sure that the root is part of the dynamic path
+
+ // the "NoOp"a that come after the union.
+ SingleInputPlanNode nextWorksetNoop = (SingleInputPlanNode) iterNode.getNextWorkSetPlanNode();
+ SingleInputPlanNode solutionDeltaNoop = (SingleInputPlanNode) iterNode.getSolutionSetDeltaPlanNode();
+
+ NAryUnionPlanNode nextWorksetUnion = (NAryUnionPlanNode) nextWorksetNoop.getInput().getSource();
+ NAryUnionPlanNode solutionDeltaUnion = (NAryUnionPlanNode) solutionDeltaNoop.getInput().getSource();
+
+ assertTrue(nextWorksetNoop.isOnDynamicPath());
+ assertTrue(nextWorksetNoop.getCostWeight() >= 1);
+
+ assertTrue(solutionDeltaNoop.isOnDynamicPath());
+ assertTrue(solutionDeltaNoop.getCostWeight() >= 1);
+
+ assertTrue(nextWorksetUnion.isOnDynamicPath());
+ assertTrue(nextWorksetUnion.getCostWeight() >= 1);
+
+ assertTrue(solutionDeltaUnion.isOnDynamicPath());
+ assertTrue(solutionDeltaUnion.getCostWeight() >= 1);
+
+ new JobGraphGenerator().compileJobGraph(op);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
new file mode 100644
index 0000000..0a62132
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class JoinTranslationTest extends CompilerTestBase {
+
+ @Test
+ public void testBroadcastHashFirstTest() {
+ try {
+ DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.BROADCAST_HASH_FIRST);
+ assertEquals(ShipStrategyType.BROADCAST, node.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, node.getInput2().getShipStrategy());
+ assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, node.getDriverStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBroadcastHashSecondTest() {
+ try {
+ DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.BROADCAST_HASH_SECOND);
+ assertEquals(ShipStrategyType.FORWARD, node.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.BROADCAST, node.getInput2().getShipStrategy());
+ assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, node.getDriverStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPartitionHashFirstTest() {
+ try {
+ DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.REPARTITION_HASH_FIRST);
+ assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
+ assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, node.getDriverStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPartitionHashSecondTest() {
+ try {
+ DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.REPARTITION_HASH_SECOND);
+ assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
+ assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, node.getDriverStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPartitionSortMergeTest() {
+ try {
+ DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.REPARTITION_SORT_MERGE);
+ assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
+ assertEquals(DriverStrategy.MERGE, node.getDriverStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOptimizerChoosesTest() {
+ try {
+ DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.OPTIMIZER_CHOOSES);
+ assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy());
+ assertTrue(DriverStrategy.HYBRIDHASH_BUILD_FIRST == node.getDriverStrategy() ||
+ DriverStrategy.HYBRIDHASH_BUILD_SECOND == node.getDriverStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + ": " + e.getMessage());
+ }
+ }
+
+
+ private DualInputPlanNode createPlanAndGetJoinNode(JoinHint hint) {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> i1 = env.generateSequence(1, 1000);
+ DataSet<Long> i2 = env.generateSequence(1, 1000);
+
+ i1.join(i2, hint).where(new IdentityKeySelector<Long>()).equalTo(new IdentityKeySelector<Long>()).print();
+
+ Plan plan = env.createProgramPlan();
+
+ // set statistics to the sources
+ plan.accept(new Visitor<Operator<?>>() {
+ @Override
+ public boolean preVisit(Operator<?> visitable) {
+ if (visitable instanceof GenericDataSourceBase) {
+ GenericDataSourceBase<?, ?> source = (GenericDataSourceBase<?, ?>) visitable;
+ setSourceStatistics(source, 10000000, 1000);
+ }
+
+ return true;
+ }
+
+ @Override
+ public void postVisit(Operator<?> visitable) {}
+ });
+
+ OptimizedPlan op = compileWithStats(plan);
+
+ return (DualInputPlanNode) ((SinkPlanNode) op.getDataSinks().iterator().next()).getInput().getSource();
+ }
+
+
+
+ private static final class IdentityKeySelector<T> implements KeySelector<T, T> {
+
+ @Override
+ public T getKey(T value) {
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
new file mode 100644
index 0000000..cd63b72
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class OpenIterationTest extends CompilerTestBase {
+
+ @Test
+ public void testSinkInOpenBulkIteration() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> input = env.generateSequence(1, 10);
+
+ IterativeDataSet<Long> iteration = input.iterate(10);
+
+ DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>());
+
+ mapped.print();
+
+ try {
+ env.createProgramPlan();
+ fail("should throw an exception");
+ }
+ catch (InvalidProgramException e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSinkInClosedBulkIteration() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> input = env.generateSequence(1, 10);
+
+ IterativeDataSet<Long> iteration = input.iterate(10);
+
+ DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>());
+
+ iteration.closeWith(mapped).print();
+
+ mapped.print();
+
+ Plan p = env.createProgramPlan();
+
+ try {
+ compileNoStats(p);
+ fail("should throw an exception");
+ }
+ catch (InvalidProgramException e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSinkOnSolutionSetDeltaIteration() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L,0L));
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 10, 0);
+
+ DataSet<Tuple2<Long, Long>> mapped = iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long, Long>>());
+
+ mapped.print();
+
+ try {
+ env.createProgramPlan();
+ fail("should throw an exception");
+ }
+ catch (InvalidProgramException e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSinkOnWorksetDeltaIteration() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L,0L));
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 10, 0);
+
+ DataSet<Tuple2<Long, Long>> mapped = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+
+ mapped.print();
+
+ try {
+ env.createProgramPlan();
+ fail("should throw an exception");
+ }
+ catch (InvalidProgramException e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOperationOnSolutionSet() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L,0L));
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 10, 0);
+
+ DataSet<Tuple2<Long, Long>> mapped = iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined = iteration.getWorkset().join(mapped)
+ .where(0).equalTo(0).projectFirst(1).projectSecond(0);
+
+ iteration.closeWith(joined, joined)
+ .print();
+
+ Plan p = env.createProgramPlan();
+ try {
+ compileNoStats(p);
+ fail("should throw an exception");
+ }
+ catch (InvalidProgramException e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
new file mode 100644
index 0000000..8bb9a76
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import static org.junit.Assert.*;
+
+import java.util.Collections;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PartitionOperatorTest extends CompilerTestBase {
+
+ @Test
+ public void testPartitionOperatorPreservesFields() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<Long, Long>(0L, 0L)));
+
+ data.partitionCustom(new Partitioner<Long>() {
+ public int partition(Long key, int numPartitions) { return key.intValue(); }
+ }, 1)
+ .groupBy(1)
+ .reduceGroup(new IdentityGroupReducer<Tuple2<Long,Long>>())
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+ SingleInputPlanNode partitioner = (SingleInputPlanNode) reducer.getInput().getSource();
+
+ assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
new file mode 100644
index 0000000..0724a9f
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.java;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.CompilerTestBase;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class ReduceCompilationTest extends CompilerTestBase implements java.io.Serializable {
+
+ @Test
+ public void testAllReduceNoCombiner() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
+
+ data.reduce(new RichReduceFunction<Double>() {
+
+ @Override
+ public Double reduce(Double value1, Double value2){
+ return value1 + value2;
+ }
+ }).name("reducer")
+ .print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+
+ // the all-reduce has no combiner, when the DOP of the input is one
+
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // check wiring
+ assertEquals(sourceNode, reduceNode.getInput().getSource());
+ assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+ // check DOP
+ assertEquals(1, sourceNode.getParallelism());
+ assertEquals(1, reduceNode.getParallelism());
+ assertEquals(1, sinkNode.getParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testAllReduceWithCombiner() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
+
+ data.reduce(new RichReduceFunction<Long>() {
+
+ @Override
+ public Long reduce(Long value1, Long value2){
+ return value1 + value2;
+ }
+ }).name("reducer")
+ .print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the combiner
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, combineNode.getInput().getSource());
+ assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.ALL_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.ALL_REDUCE, combineNode.getDriverStrategy());
+
+ // check DOP
+ assertEquals(8, sourceNode.getParallelism());
+ assertEquals(8, combineNode.getParallelism());
+ assertEquals(1, reduceNode.getParallelism());
+ assertEquals(1, sinkNode.getParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGroupedReduceWithFieldPositionKey() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ data
+ .groupBy(1)
+ .reduce(new RichReduceFunction<Tuple2<String,Double>>() {
+ @Override
+ public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
+ return null;
+ }
+ }).name("reducer")
+ .print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the combiner
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, combineNode.getInput().getSource());
+ assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(1), reduceNode.getKeys(0));
+ assertEquals(new FieldList(1), combineNode.getKeys(0));
+ assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check DOP
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGroupedReduceWithSelectorFunctionKey() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ data
+ .groupBy(new KeySelector<Tuple2<String,Double>, String>() {
+ public String getKey(Tuple2<String, Double> value) { return value.f0; }
+ })
+ .reduce(new RichReduceFunction<Tuple2<String,Double>>() {
+ @Override
+ public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
+ return null;
+ }
+ }).name("reducer")
+ .print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the combiner
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+ // get the key extractors and projectors
+ SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
+ SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, keyExtractor.getInput().getSource());
+ assertEquals(keyProjector, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(0), reduceNode.getKeys(0));
+ assertEquals(new FieldList(0), combineNode.getKeys(0));
+ assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check DOP
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, keyExtractor.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
+
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, keyProjector.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+}