You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/17 02:09:11 UTC
[1/6] [FLINK-935] (continued) Correct feedback property check for
iterative algorithms.
Repository: incubator-flink
Updated Branches:
refs/heads/master 708426ba6 -> ca2b287a7
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/FeedbackPropertiesMatchTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/FeedbackPropertiesMatchTest.java
new file mode 100644
index 0000000..cdbea5b
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/FeedbackPropertiesMatchTest.java
@@ -0,0 +1,1432 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.compiler;
+
+import static org.junit.Assert.*;
+import static eu.stratosphere.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport.*;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.functions.GenericJoiner;
+import eu.stratosphere.api.common.functions.GenericMap;
+import eu.stratosphere.api.common.operators.BinaryOperatorInformation;
+import eu.stratosphere.api.common.operators.OperatorInformation;
+import eu.stratosphere.api.common.operators.Order;
+import eu.stratosphere.api.common.operators.Ordering;
+import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
+import eu.stratosphere.api.common.operators.base.GenericDataSourceBase;
+import eu.stratosphere.api.common.operators.base.JoinOperatorBase;
+import eu.stratosphere.api.common.operators.base.MapOperatorBase;
+import eu.stratosphere.api.common.operators.util.FieldList;
+import eu.stratosphere.api.common.operators.util.FieldSet;
+import eu.stratosphere.api.java.io.TextInputFormat;
+import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
+import eu.stratosphere.compiler.dag.DataSourceNode;
+import eu.stratosphere.compiler.dag.MapNode;
+import eu.stratosphere.compiler.dag.MatchNode;
+import eu.stratosphere.compiler.dataproperties.GlobalProperties;
+import eu.stratosphere.compiler.dataproperties.LocalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedGlobalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.DualInputPlanNode;
+import eu.stratosphere.compiler.plan.SingleInputPlanNode;
+import eu.stratosphere.compiler.plan.SourcePlanNode;
+import eu.stratosphere.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
+import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.pact.compiler.testfunctions.DummyJoinFunction;
+import eu.stratosphere.pact.compiler.testfunctions.IdentityMapper;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
+import eu.stratosphere.pact.runtime.task.DriverStrategy;
+import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
+
+
+public class FeedbackPropertiesMatchTest {
+
+ @Test
+ public void testNoPartialSolutionFoundSingleInputOnly() {
+ try {
+ SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
+
+ SourcePlanNode otherTarget = new SourcePlanNode(getSourceNode(), "Source");
+
+ Channel toMap1 = new Channel(target);
+ toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap1.setLocalStrategy(LocalStrategy.NONE);
+ SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+
+ Channel toMap2 = new Channel(map1);
+ toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap2.setLocalStrategy(LocalStrategy.NONE);
+ SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+
+ {
+ GlobalProperties gp = new GlobalProperties();
+ LocalProperties lp = new LocalProperties();
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(otherTarget, gp, lp);
+ assertTrue(report == NO_PARTIAL_SOLUTION);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSingleInputOperators() {
+ try {
+ SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
+
+ Channel toMap1 = new Channel(target);
+ toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap1.setLocalStrategy(LocalStrategy.NONE);
+ SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+
+ Channel toMap2 = new Channel(map1);
+ toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap2.setLocalStrategy(LocalStrategy.NONE);
+ SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+
+ // no feedback properties and none are ever required and present
+ {
+ GlobalProperties gp = new GlobalProperties();
+ LocalProperties lp = new LocalProperties();
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some global feedback properties and none are ever required and present
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(2, 5));
+ LocalProperties lp = new LocalProperties();
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some local feedback properties and none are ever required and present
+ {
+ GlobalProperties gp = new GlobalProperties();
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some global and local feedback properties and none are ever required and present
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(2, 5));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // --------------------------- requirements on channel 1 -----------------------
+
+ // some required global properties, which are matched exactly
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(2, 5));
+ LocalProperties lp = new LocalProperties();
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setHashPartitioned(new FieldList(2, 5));
+
+ toMap1.setRequiredGlobalProps(reqGp);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some required local properties, which are matched exactly
+ {
+ GlobalProperties gp = new GlobalProperties();
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(1, 2));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(reqLp);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some required global and local properties, which are matched exactly
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(2, 5));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setHashPartitioned(new FieldList(2, 5));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(1, 2));
+
+ toMap1.setRequiredGlobalProps(reqGp);
+ toMap1.setRequiredLocalProps(reqLp);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some required global and local properties, which are over-fulfilled
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(2));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setHashPartitioned(new FieldList(2, 5));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(1));
+
+ toMap1.setRequiredGlobalProps(reqGp);
+ toMap1.setRequiredLocalProps(reqLp);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some required global properties that are not met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(2, 1));
+ LocalProperties lp = new LocalProperties();
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setHashPartitioned(new FieldList(2, 5));
+
+ toMap1.setRequiredGlobalProps(reqGp);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // some required local properties that are not met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(2, 1));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(reqLp);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // some required global and local properties where the global properties are not met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(2, 1));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldList(2, 5));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(1));
+
+ toMap1.setRequiredGlobalProps(reqGp);
+ toMap1.setRequiredLocalProps(reqLp);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // some required global and local properties where the local properties are not met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldList(1));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(2));
+
+ toMap1.setRequiredGlobalProps(reqGp);
+ toMap1.setRequiredLocalProps(reqLp);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // --------------------------- requirements on channel 2 -----------------------
+
+ // some required global properties, which are matched exactly
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(2, 5));
+ LocalProperties lp = new LocalProperties();
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setHashPartitioned(new FieldList(2, 5));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(reqGp);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some required local properties, which are matched exactly
+ {
+ GlobalProperties gp = new GlobalProperties();
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(1, 2));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(reqLp);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some required global and local properties, which are matched exactly
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(2, 5));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setHashPartitioned(new FieldList(2, 5));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(1, 2));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(reqGp);
+ toMap2.setRequiredLocalProps(reqLp);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some required global and local properties, which are over-fulfilled
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(2));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setHashPartitioned(new FieldList(2, 5));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(1));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(reqGp);
+ toMap2.setRequiredLocalProps(reqLp);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some required global properties that are not met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(2, 1));
+ LocalProperties lp = new LocalProperties();
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setHashPartitioned(new FieldList(2, 5));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(reqGp);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // some required local properties that are not met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(2, 1));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(reqLp);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // some required global and local properties where the global properties are not met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(2, 1));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldList(2, 5));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(1));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(reqGp);
+ toMap2.setRequiredLocalProps(reqLp);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // some required global and local properties where the local properties are not met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldList(1));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(2));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(reqGp);
+ toMap2.setRequiredLocalProps(reqLp);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // ---------------------- requirements mixed on 1 and 2 -----------------------
+
+ // some required global properties at step one and some more at step 2
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1, 2));
+ LocalProperties lp = LocalProperties.EMPTY;
+
+ RequestedGlobalProperties reqGp1 = new RequestedGlobalProperties();
+ reqGp1.setAnyPartitioning(new FieldList(1, 2));
+
+ RequestedGlobalProperties reqGp2 = new RequestedGlobalProperties();
+ reqGp2.setHashPartitioned(new FieldList(1, 2));
+
+ toMap1.setRequiredGlobalProps(reqGp1);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(reqGp2);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some required local properties at step one and some more at step 2
+ {
+ GlobalProperties gp = new GlobalProperties();
+ LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
+
+ RequestedLocalProperties reqLp1 = new RequestedLocalProperties();
+ reqLp1.setGroupedFields(new FieldList(3, 1));
+
+ RequestedLocalProperties reqLp2 = new RequestedLocalProperties();
+ reqLp2.setOrdering(new Ordering(3, null, Order.ANY).appendOrdering(1, null, Order.ANY));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(reqLp1);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(reqLp2);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some required global properties at step one and some local ones at step 2
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1, 2));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldList(1, 2));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(2));
+
+ toMap1.setRequiredGlobalProps(reqGp);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(reqLp);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some required local properties at step one and some global ones at step 2
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1, 2));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldList(1, 2));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(2));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(reqLp);
+
+ toMap2.setRequiredGlobalProps(reqGp);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some fulfilled global properties at step one and some non-fulfilled local ones at step 2
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1, 2));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldList(1, 2));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(2, 3));
+
+ toMap1.setRequiredGlobalProps(reqGp);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(reqLp);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // some fulfilled local properties at step one and some non-fulfilled global ones at step 2
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1, 2));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldList(2, 3));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(2, 1));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(reqLp);
+
+ toMap2.setRequiredGlobalProps(reqGp);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // some non-fulfilled global properties at step one and some fulfilled local ones at step 2
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1, 2));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldList(2, 3));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(2, 1));
+
+ toMap1.setRequiredGlobalProps(reqGp);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(reqLp);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // some non-fulfilled local properties at step one and some fulfilled global ones at step 2
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1, 2));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldList(1, 2));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(2, 1, 3));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(reqLp);
+
+ toMap2.setRequiredGlobalProps(reqGp);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSingleInputOperatorsWithReCreation() {
+ try {
+ SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
+
+ Channel toMap1 = new Channel(target);
+ SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+
+ Channel toMap2 = new Channel(map1);
+ SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+
+ // set ship strategy in first channel, so later non matching global properties do not matter
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1, 2));
+ LocalProperties lp = LocalProperties.EMPTY;
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldSet(2, 5));
+
+ toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5));
+ toMap1.setLocalStrategy(LocalStrategy.NONE);
+
+ toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap2.setLocalStrategy(LocalStrategy.NONE);
+
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(reqGp);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(MET, report);
+ }
+
+ // set ship strategy in second channel, so previous non matching global properties void the match
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1, 2));
+ LocalProperties lp = LocalProperties.EMPTY;
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldSet(2, 5));
+
+ toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap1.setLocalStrategy(LocalStrategy.NONE);
+
+ toMap2.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5));
+ toMap2.setLocalStrategy(LocalStrategy.NONE);
+
+
+ toMap1.setRequiredGlobalProps(reqGp);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // set local strategy in first channel, so later non matching local properties do not matter
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1, 2));
+ LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(4, 1));
+
+ toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
+
+ toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap2.setLocalStrategy(LocalStrategy.NONE);
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(reqLp);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // set local strategy in second channel, so previous non matching local properties void the match
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1, 2));
+ LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(4, 1));
+
+ toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap1.setLocalStrategy(LocalStrategy.NONE);
+
+ toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap2.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
+
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(reqLp);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // create the properties on the same node as the requirement
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(1, 2));
+ LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldSet(5, 7));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(5, 7));
+
+ toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(5, 7));
+ toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
+
+ toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap2.setLocalStrategy(LocalStrategy.NONE);
+
+ toMap1.setRequiredGlobalProps(reqGp);
+ toMap1.setRequiredLocalProps(reqLp);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map2.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(MET, report);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSingleInputOperatorsChainOfThree() {
+ try {
+ SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
+
+ Channel toMap1 = new Channel(target);
+ SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+
+ Channel toMap2 = new Channel(map1);
+ SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+
+ Channel toMap3 = new Channel(map2);
+ SingleInputPlanNode map3 = new SingleInputPlanNode(getMapNode(), "Mapper 3", toMap3, DriverStrategy.MAP);
+
+ // set local strategy in first channel, so later non matching local properties do not matter
+ {
+ GlobalProperties gp = new GlobalProperties();
+ LocalProperties lp = LocalProperties.forOrdering(new Ordering(3, null, Order.ASCENDING).appendOrdering(1, null, Order.DESCENDING));
+
+ RequestedLocalProperties reqLp = new RequestedLocalProperties();
+ reqLp.setGroupedFields(new FieldList(4, 1));
+
+ toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
+
+ toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap2.setLocalStrategy(LocalStrategy.NONE);
+
+ toMap3.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap3.setLocalStrategy(LocalStrategy.NONE);
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(null);
+
+ toMap3.setRequiredGlobalProps(null);
+ toMap3.setRequiredLocalProps(reqLp);
+
+ FeedbackPropertiesMeetRequirementsReport report = map3.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // set global strategy in first channel, so later non matching global properties do not matter
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(5, 3));
+ LocalProperties lp = LocalProperties.EMPTY;
+
+ RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
+ reqGp.setAnyPartitioning(new FieldSet(2, 3));
+
+ toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(1, 2));
+ toMap1.setLocalStrategy(LocalStrategy.NONE);
+
+ toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap2.setLocalStrategy(LocalStrategy.NONE);
+
+ toMap3.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap3.setLocalStrategy(LocalStrategy.NONE);
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(null);
+
+ toMap2.setRequiredGlobalProps(null);
+ toMap2.setRequiredLocalProps(null);
+
+ toMap3.setRequiredGlobalProps(reqGp);
+ toMap3.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = map3.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testNoPartialSolutionFoundTwoInputOperator() {
+ try {
+ SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
+
+ SourcePlanNode source1 = new SourcePlanNode(getSourceNode(), "Source 1");
+ SourcePlanNode source2 = new SourcePlanNode(getSourceNode(), "Source 2");
+
+ Channel toMap1 = new Channel(source1);
+ toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap1.setLocalStrategy(LocalStrategy.NONE);
+ SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+
+ Channel toMap2 = new Channel(source2);
+ toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap2.setLocalStrategy(LocalStrategy.NONE);
+ SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+
+ Channel toJoin1 = new Channel(map1);
+ Channel toJoin2 = new Channel(map2);
+
+ toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin1.setLocalStrategy(LocalStrategy.NONE);
+ toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin2.setLocalStrategy(LocalStrategy.NONE);
+
+ DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+
+ FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, new GlobalProperties(), new LocalProperties());
+ assertEquals(NO_PARTIAL_SOLUTION, report);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTwoOperatorsOneIndependent() {
+ try {
+ SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
+ SourcePlanNode source = new SourcePlanNode(getSourceNode(), "Other Source");
+
+ Channel toMap1 = new Channel(target);
+ toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap1.setLocalStrategy(LocalStrategy.NONE);
+ SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+
+ Channel toMap2 = new Channel(source);
+ toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap2.setLocalStrategy(LocalStrategy.NONE);
+ SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+
+ Channel toJoin1 = new Channel(map1);
+ Channel toJoin2 = new Channel(map2);
+
+ DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+
+ Channel toAfterJoin = new Channel(join);
+ toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD);
+ toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
+ SingleInputPlanNode afterJoin = new SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP);
+
+ // attach some properties to the non-relevant input
+ {
+ toMap2.setShipStrategy(ShipStrategyType.BROADCAST);
+ toMap2.setLocalStrategy(LocalStrategy.SORT, new FieldList(2, 7), new boolean[] {true, true});
+
+ RequestedGlobalProperties joinGp = new RequestedGlobalProperties();
+ joinGp.setFullyReplicated();
+
+ RequestedLocalProperties joinLp = new RequestedLocalProperties();
+ joinLp.setOrdering(new Ordering(2, null, Order.ASCENDING).appendOrdering(7, null, Order.ASCENDING));
+
+ toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin2.setLocalStrategy(LocalStrategy.NONE);
+ toJoin2.setRequiredGlobalProps(joinGp);
+ toJoin2.setRequiredLocalProps(joinLp);
+ }
+
+ // ------------------------------------------------------------------------------------
+
+ // no properties from the partial solution, no required properties
+ {
+ toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin1.setLocalStrategy(LocalStrategy.NONE);
+
+ GlobalProperties gp = new GlobalProperties();
+ LocalProperties lp = LocalProperties.EMPTY;
+
+ FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some properties from the partial solution, no required properties
+ {
+ toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin1.setLocalStrategy(LocalStrategy.NONE);
+
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+
+ // produced properties match relevant input
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setHashPartitioned(new FieldList(0));
+
+ RequestedLocalProperties rlp = new RequestedLocalProperties();
+ rlp.setGroupedFields(new FieldList(2));
+
+ toJoin1.setRequiredGlobalProps(rgp);
+ toJoin1.setRequiredLocalProps(rlp);
+
+ toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin1.setLocalStrategy(LocalStrategy.NONE);
+
+ FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // produced properties do not match relevant input
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setHashPartitioned(new FieldList(0));
+
+ RequestedLocalProperties rlp = new RequestedLocalProperties();
+ rlp.setGroupedFields(new FieldList(1, 2, 3));
+
+ toJoin1.setRequiredGlobalProps(rgp);
+ toJoin1.setRequiredLocalProps(rlp);
+
+ toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin1.setLocalStrategy(LocalStrategy.NONE);
+
+ FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // produced properties overridden before join
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setHashPartitioned(new FieldList(0));
+
+ RequestedLocalProperties rlp = new RequestedLocalProperties();
+ rlp.setGroupedFields(new FieldList(2, 1));
+
+ toMap1.setRequiredGlobalProps(rgp);
+ toMap1.setRequiredLocalProps(rlp);
+
+ toJoin1.setRequiredGlobalProps(null);
+ toJoin1.setRequiredLocalProps(null);
+
+ toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1));
+ toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
+
+ FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(MET, report);
+ }
+
+ // produced properties before join match, after join match as well
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setHashPartitioned(new FieldList(0));
+
+ RequestedLocalProperties rlp = new RequestedLocalProperties();
+ rlp.setGroupedFields(new FieldList(2, 1));
+
+ toMap1.setRequiredGlobalProps(null);
+ toMap1.setRequiredLocalProps(null);
+
+ toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin1.setLocalStrategy(LocalStrategy.NONE);
+
+ toJoin1.setRequiredGlobalProps(rgp);
+ toJoin1.setRequiredLocalProps(rlp);
+
+ toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD);
+ toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
+
+ toAfterJoin.setRequiredGlobalProps(rgp);
+ toAfterJoin.setRequiredLocalProps(rlp);
+
+ FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // produced properties before join match, after join do not match
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp1 = new RequestedGlobalProperties();
+ rgp1.setHashPartitioned(new FieldList(0));
+
+ RequestedGlobalProperties rgp2 = new RequestedGlobalProperties();
+ rgp2.setHashPartitioned(new FieldList(3));
+
+ RequestedLocalProperties rlp1 = new RequestedLocalProperties();
+ rlp1.setGroupedFields(new FieldList(2, 1));
+
+ RequestedLocalProperties rlp2 = new RequestedLocalProperties();
+ rlp2.setGroupedFields(new FieldList(3, 4));
+
+ toJoin1.setRequiredGlobalProps(rgp1);
+ toJoin1.setRequiredLocalProps(rlp1);
+
+ toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD);
+ toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
+
+ toAfterJoin.setRequiredGlobalProps(rgp2);
+ toAfterJoin.setRequiredLocalProps(rlp2);
+
+ FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // produced properties are overridden, does not matter that they do not match
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setAnyPartitioning(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setHashPartitioned(new FieldList(1));
+
+ RequestedLocalProperties rlp = new RequestedLocalProperties();
+ rlp.setGroupedFields(new FieldList(1, 2, 3));
+
+ toJoin1.setRequiredGlobalProps(null);
+ toJoin1.setRequiredLocalProps(null);
+
+ toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1));
+ toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
+
+ toAfterJoin.setRequiredGlobalProps(rgp);
+ toAfterJoin.setRequiredLocalProps(rlp);
+
+ FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(MET, report);
+ }
+
+ // local property overridden before join, local property mismatch after join not relevant
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setAnyPartitioning(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedLocalProperties rlp = new RequestedLocalProperties();
+ rlp.setGroupedFields(new FieldList(1, 2, 3));
+
+ toJoin1.setRequiredGlobalProps(null);
+ toJoin1.setRequiredLocalProps(null);
+
+ toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
+
+ toAfterJoin.setRequiredGlobalProps(null);
+ toAfterJoin.setRequiredLocalProps(rlp);
+
+ FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // local property overridden before join, global property mismatch after join void the match
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setAnyPartitioning(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setHashPartitioned(new FieldList(1));
+
+ RequestedLocalProperties rlp = new RequestedLocalProperties();
+ rlp.setGroupedFields(new FieldList(1, 2, 3));
+
+ toJoin1.setRequiredGlobalProps(null);
+ toJoin1.setRequiredLocalProps(null);
+
+ toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
+
+ toAfterJoin.setRequiredGlobalProps(rgp);
+ toAfterJoin.setRequiredLocalProps(rlp);
+
+ FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTwoOperatorsBothDependent() {
+ try {
+ SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
+
+ Channel toMap1 = new Channel(target);
+ toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap1.setLocalStrategy(LocalStrategy.NONE);
+ SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
+
+ Channel toMap2 = new Channel(target);
+ toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+ toMap2.setLocalStrategy(LocalStrategy.NONE);
+ SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
+
+ Channel toJoin1 = new Channel(map1);
+ toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin1.setLocalStrategy(LocalStrategy.NONE);
+
+ Channel toJoin2 = new Channel(map2);
+ toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin2.setLocalStrategy(LocalStrategy.NONE);
+
+ DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+
+ Channel toAfterJoin = new Channel(join);
+ toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD);
+ toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
+ SingleInputPlanNode afterJoin = new SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP);
+
+ // no properties from the partial solution, no required properties
+ {
+ GlobalProperties gp = new GlobalProperties();
+ LocalProperties lp = LocalProperties.EMPTY;
+
+ FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // some properties from the partial solution, no required properties
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // test requirements on one input and met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setHashPartitioned(new FieldList(0));
+
+ RequestedLocalProperties rlp = new RequestedLocalProperties();
+ rlp.setGroupedFields(new FieldList(2, 1));
+
+ toJoin1.setRequiredGlobalProps(rgp);
+ toJoin1.setRequiredLocalProps(rlp);
+
+ FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // test requirements on both input and met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setHashPartitioned(new FieldList(0));
+
+ RequestedLocalProperties rlp = new RequestedLocalProperties();
+ rlp.setGroupedFields(new FieldList(2, 1));
+
+ toJoin1.setRequiredGlobalProps(rgp);
+ toJoin1.setRequiredLocalProps(rlp);
+
+ toJoin2.setRequiredGlobalProps(rgp);
+ toJoin2.setRequiredLocalProps(rlp);
+
+ FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertTrue(report != null && report != NO_PARTIAL_SOLUTION && report != NOT_MET);
+ }
+
+ // test requirements on both inputs, one not met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp1 = new RequestedGlobalProperties();
+ rgp1.setHashPartitioned(new FieldList(0));
+
+ RequestedLocalProperties rlp1 = new RequestedLocalProperties();
+ rlp1.setGroupedFields(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp2 = new RequestedGlobalProperties();
+ rgp2.setHashPartitioned(new FieldList(1));
+
+ RequestedLocalProperties rlp2 = new RequestedLocalProperties();
+ rlp2.setGroupedFields(new FieldList(0, 3));
+
+ toJoin1.setRequiredGlobalProps(rgp1);
+ toJoin1.setRequiredLocalProps(rlp1);
+
+ toJoin2.setRequiredGlobalProps(rgp2);
+ toJoin2.setRequiredLocalProps(rlp2);
+
+ FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // test override on both inputs, later requirement ignored
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setHashPartitioned(new FieldList(1));
+
+ RequestedLocalProperties rlp = new RequestedLocalProperties();
+ rlp.setGroupedFields(new FieldList(0, 3));
+
+ toJoin1.setRequiredGlobalProps(null);
+ toJoin1.setRequiredLocalProps(null);
+
+ toJoin2.setRequiredGlobalProps(null);
+ toJoin2.setRequiredLocalProps(null);
+
+ toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88));
+ toJoin2.setShipStrategy(ShipStrategyType.BROADCAST);
+
+ toAfterJoin.setRequiredGlobalProps(rgp);
+ toAfterJoin.setRequiredLocalProps(rlp);
+
+ FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(MET, report);
+ }
+
+ // test override on one inputs, later requirement met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setHashPartitioned(new FieldList(0));
+
+ RequestedLocalProperties rlp = new RequestedLocalProperties();
+ rlp.setGroupedFields(new FieldList(2, 1));
+
+ toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88));
+ toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+
+ toAfterJoin.setRequiredGlobalProps(rgp);
+ toAfterJoin.setRequiredLocalProps(rlp);
+
+ FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(PENDING, report);
+ }
+
+ // test override on one input, later requirement not met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setHashPartitioned(new FieldList(3));
+
+ RequestedLocalProperties rlp = new RequestedLocalProperties();
+ rlp.setGroupedFields(new FieldList(77, 69));
+
+ toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88));
+ toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+
+ toAfterJoin.setRequiredGlobalProps(rgp);
+ toAfterJoin.setRequiredLocalProps(rlp);
+
+ FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+
+ // test override on one input locally, later global requirement not met
+ {
+ GlobalProperties gp = new GlobalProperties();
+ gp.setHashPartitioned(new FieldList(0));
+ LocalProperties lp = LocalProperties.forGrouping(new FieldList(2, 1));
+
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setHashPartitioned(new FieldList(3));
+
+
+ toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(3), new boolean[] { false });
+
+ toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+ toJoin1.setLocalStrategy(LocalStrategy.NONE);
+
+ toAfterJoin.setRequiredGlobalProps(rgp);
+ toAfterJoin.setRequiredLocalProps(null);
+
+ FeedbackPropertiesMeetRequirementsReport report = afterJoin.checkPartialSolutionPropertiesMet(target, gp, lp);
+ assertEquals(NOT_MET, report);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final DataSourceNode getSourceNode() {
+ return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(new TextInputFormat(new Path("/")), new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO)));
+ }
+
+ private static final MapNode getMapNode() {
+ return new MapNode(new MapOperatorBase<String, String, GenericMap<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
+ }
+
+ private static final MatchNode getJoinNode() {
+ return new MatchNode(new JoinOperatorBase<String, String, String, GenericJoiner<String, String, String>>(new DummyJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
index a123099..05a863c 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
@@ -38,7 +38,6 @@ import eu.stratosphere.compiler.plan.BulkIterationPlanNode;
import eu.stratosphere.compiler.plan.Channel;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
-import eu.stratosphere.compiler.plandump.PlanJSONDumpGenerator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.util.Collector;
@@ -164,8 +163,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
- System.out.println(new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op));
-
assertEquals(1, op.getDataSinks().size());
assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof BulkIterationPlanNode);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/testfunctions/DummyJoinFunction.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/testfunctions/DummyJoinFunction.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/testfunctions/DummyJoinFunction.java
new file mode 100644
index 0000000..aed8f26
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/testfunctions/DummyJoinFunction.java
@@ -0,0 +1,28 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.compiler.testfunctions;
+
+import eu.stratosphere.api.java.functions.JoinFunction;
+
+public class DummyJoinFunction<T> extends JoinFunction<T, T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T join(T first, T second) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/PageRankBasic.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/PageRankBasic.java
index 00764d8..3be5009 100644
--- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/PageRankBasic.java
+++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/graph/PageRankBasic.java
@@ -24,6 +24,7 @@ import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.IterativeDataSet;
import eu.stratosphere.api.java.functions.FilterFunction;
import eu.stratosphere.api.java.functions.FlatMapFunction;
+import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
@@ -94,7 +95,7 @@ public class PageRankBasic {
DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
map(new RankAssigner((1.0d / numPages)));
- // build adjecency list from link input
+ // build adjacency list from link input
DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
@@ -150,8 +151,9 @@ public class PageRankBasic {
/**
* A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges
- * originate. Run as a preprocessing step.
+ * originate. Run as a pre-processing step.
*/
+ @ConstantFields("0")
public static final class BuildOutgoingEdgeList extends GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
private final ArrayList<Long> neighbors = new ArrayList<Long>();
@@ -190,6 +192,7 @@ public class PageRankBasic {
/**
* The function that applies the page rank dampening formula
*/
+ @ConstantFields("0")
public static final class Dampener extends MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
private final double dampening;
@@ -275,5 +278,4 @@ public class PageRankBasic {
return PageRankData.getDefaultEdgeDataSet(env);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
index 882c3a9..e4f0a4b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
@@ -14,6 +14,7 @@
package eu.stratosphere.pact.runtime.cache;
import eu.stratosphere.api.common.cache.DistributedCache;
+
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
@@ -64,7 +65,7 @@ public class FileCache {
*/
public FutureTask<Path> createTmpFile(String name, DistributedCacheEntry entry, JobID jobID) {
synchronized (count) {
- Pair<JobID, String> key = new ImmutablePair(jobID, name);
+ Pair<JobID, String> key = new ImmutablePair<JobID, String>(jobID, name);
if (count.containsKey(key)) {
count.put(key, count.get(key) + 1);
} else {
@@ -84,7 +85,7 @@ public class FileCache {
* @param jobID
*/
public void deleteTmpFile(String name, DistributedCacheEntry entry, JobID jobID) {
- DeleteProcess dp = new DeleteProcess(name, entry, jobID, count.get(new ImmutablePair(jobID,name)));
+ DeleteProcess dp = new DeleteProcess(name, entry, jobID, count.get(new ImmutablePair<JobID, String>(jobID,name)));
executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS);
}
@@ -138,6 +139,7 @@ public class FileCache {
*/
private class CopyProcess implements Callable<Path> {
private JobID jobID;
+ @SuppressWarnings("unused")
private String name;
private String filePath;
private Boolean executable;
@@ -167,6 +169,7 @@ public class FileCache {
*/
private class DeleteProcess implements Runnable {
private String name;
+ @SuppressWarnings("unused")
private String filePath;
private JobID jobID;
private int oldCount;
@@ -180,7 +183,7 @@ public class FileCache {
@Override
public void run() {
synchronized (count) {
- if (count.get(new ImmutablePair(jobID, name)) != oldCount) {
+ if (count.get(new ImmutablePair<JobID, String>(jobID, name)) != oldCount) {
return;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
index a86c209..a8a1293 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
@@ -25,7 +25,7 @@ public enum ShipStrategyType {
NONE(false, false, false),
/**
- * Forwarding the data locally in memory.
+ * Forwarding the data preserving all global properties.
*/
FORWARD(false, false, false),
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
index c380431..d23b0f3 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -124,7 +124,7 @@ public class NettyConnectionManagerTest {
}
- private class VerifyEnvelopes implements Answer {
+ private class VerifyEnvelopes implements Answer<Object> {
private final ConcurrentMap<ChannelID, Integer> received = new ConcurrentHashMap<ChannelID, Integer>();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
index 3f4d258..9c8c0c8 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
@@ -102,7 +102,7 @@ public class ConnectedComponentsTest extends CompilerTestBase {
Assert.assertEquals(DriverStrategy.NONE, vertexSource.getDriverStrategy());
Assert.assertEquals(DriverStrategy.NONE, edgesSource.getDriverStrategy());
-// Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, neighborsJoin.getDriverStrategy());
+ Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, neighborsJoin.getDriverStrategy());
Assert.assertEquals(set0, neighborsJoin.getKeysForInput1());
Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());
@@ -182,7 +182,7 @@ public class ConnectedComponentsTest extends CompilerTestBase {
Assert.assertEquals(DriverStrategy.NONE, vertexSource.getDriverStrategy());
Assert.assertEquals(DriverStrategy.NONE, edgesSource.getDriverStrategy());
-// Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, neighborsJoin.getDriverStrategy());
+ Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, neighborsJoin.getDriverStrategy());
Assert.assertEquals(set0, neighborsJoin.getKeysForInput1());
Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/PageRankCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/PageRankCompilerTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/PageRankCompilerTest.java
new file mode 100644
index 0000000..547a453
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/PageRankCompilerTest.java
@@ -0,0 +1,108 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.compiler.iterations;
+
+import static eu.stratosphere.api.java.aggregation.Aggregations.SUM;
+import static org.junit.Assert.fail;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.IterativeDataSet;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.compiler.PactCompiler;
+import eu.stratosphere.compiler.plan.BulkIterationPlanNode;
+import eu.stratosphere.compiler.plan.BulkPartialSolutionPlanNode;
+import eu.stratosphere.compiler.plan.OptimizedPlan;
+import eu.stratosphere.compiler.plan.SinkPlanNode;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.example.java.graph.PageRankBasic.BuildOutgoingEdgeList;
+import eu.stratosphere.example.java.graph.PageRankBasic.Dampener;
+import eu.stratosphere.example.java.graph.PageRankBasic.EpsilonFilter;
+import eu.stratosphere.example.java.graph.PageRankBasic.JoinVertexWithEdgesMatch;
+import eu.stratosphere.example.java.graph.PageRankBasic.RankAssigner;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
+import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
+import eu.stratosphere.test.compiler.util.CompilerTestBase;
+
+public class PageRankCompilerTest extends CompilerTestBase{
+
+ @Test
+ public void testPageRank() {
+ try {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple1<Long>> pagesInput = env.fromElements(new Tuple1<Long>(1l));
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<Long, Long>> linksInput =env.fromElements(new Tuple2<Long, Long>(1l, 2l));
+
+ // assign initial rank to pages
+ DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
+ map(new RankAssigner((1.0d / 10)));
+
+ // build adjacency list from link input
+ DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
+ linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());
+
+ // set iterative data set
+ IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(10);
+
+ Configuration cfg = new Configuration();
+ cfg.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+
+ DataSet<Tuple2<Long, Double>> newRanks = iteration
+ // join pages with outgoing edges and distribute rank
+ .join(adjacencyListInput).where(0).equalTo(0).withParameters(cfg)
+ .flatMap(new JoinVertexWithEdgesMatch())
+ // collect and sum ranks
+ .groupBy(0).aggregate(SUM, 1)
+ // apply dampening factor
+ .map(new Dampener(0.85, 10));
+
+ DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
+ newRanks,
+ newRanks.join(iteration).where(0).equalTo(0)
+ // termination condition
+ .filter(new EpsilonFilter()));
+
+ finalPageRanks.print();
+
+ // get the plan and compile it
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sinkPlanNode = (SinkPlanNode) op.getDataSinks().iterator().next();
+ BulkIterationPlanNode iterPlanNode = (BulkIterationPlanNode) sinkPlanNode.getInput().getSource();
+
+ // check that the partitioning is pushed out of the first loop
+ Assert.assertEquals(ShipStrategyType.PARTITION_HASH, iterPlanNode.getInput().getShipStrategy());
+ Assert.assertEquals(LocalStrategy.NONE, iterPlanNode.getInput().getLocalStrategy());
+
+ BulkPartialSolutionPlanNode partSolPlanNode = iterPlanNode.getPartialSolutionPlanNode();
+ Assert.assertEquals(ShipStrategyType.FORWARD, partSolPlanNode.getOutgoingChannels().get(0).getShipStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
[3/6] git commit: Made FieldSet and FieldList immutable collections
to prevent bugs to to side effect mutations
Posted by se...@apache.org.
Made FieldSet and FieldList immutable collections to prevent bugs to to side effect mutations
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/202aa4a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/202aa4a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/202aa4a7
Branch: refs/heads/master
Commit: 202aa4a7a20d0029096a341ae4ba05e16aaab83b
Parents: 31a3739
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jun 16 07:47:54 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200
----------------------------------------------------------------------
.../pact/compiler/GroupOrderTest.java | 3 +-
.../operators/DualInputSemanticProperties.java | 60 +++----
.../api/common/operators/Ordering.java | 13 +-
.../common/operators/SemanticProperties.java | 5 +-
.../SingleInputSemanticProperties.java | 30 ++--
.../api/common/operators/util/FieldList.java | 99 +++++++++--
.../api/common/operators/util/FieldSet.java | 176 ++++++++++++++-----
.../common/operators/util/FieldListTest.java | 105 +++++++++++
.../api/common/operators/util/FieldSetTest.java | 110 ++++++++++++
.../api/java/functions/SemanticPropUtil.java | 4 +-
.../stratosphere/api/scala/CompilerHints.scala | 17 +-
11 files changed, 495 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/GroupOrderTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/GroupOrderTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/GroupOrderTest.java
index da72c94..e170203 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/GroupOrderTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/GroupOrderTest.java
@@ -87,8 +87,7 @@ public class GroupOrderTest extends CompilerTestBase {
Assert.assertEquals(LocalStrategy.SORT, c.getLocalStrategy());
FieldList ship = new FieldList(2);
- FieldList local = new FieldList(2);
- local.add(5);
+ FieldList local = new FieldList(2, 5);
Assert.assertEquals(ship, c.getShipStrategyKeys());
Assert.assertEquals(local, c.getLocalStrategyKeys());
Assert.assertTrue(c.getLocalStrategySortOrder()[0] == reducer.getSortOrders()[0]);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java
index c6b22f2..1ed399a 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/DualInputSemanticProperties.java
@@ -63,13 +63,13 @@ public class DualInputSemanticProperties extends SemanticProperties {
* @param destinationField the position in the destination record(s)
*/
public void addForwardedField1(int sourceField, int destinationField) {
- FieldSet fs;
- if((fs = this.forwardedFields1.get(sourceField)) != null) {
- fs.add(destinationField);
- } else {
- fs = new FieldSet(destinationField);
- this.forwardedFields1.put(sourceField, fs);
+ FieldSet old = this.forwardedFields1.get(sourceField);
+ if (old == null) {
+ old = FieldSet.EMPTY_SET;
}
+
+ FieldSet fs = old.addField(destinationField);
+ this.forwardedFields1.put(sourceField, fs);
}
/**
@@ -81,13 +81,13 @@ public class DualInputSemanticProperties extends SemanticProperties {
* @param destinationFields the position in the destination record(s)
*/
public void addForwardedField1(int sourceField, FieldSet destinationFields) {
- FieldSet fs;
- if((fs = this.forwardedFields1.get(sourceField)) != null) {
- fs.addAll(destinationFields);
- } else {
- fs = new FieldSet(destinationFields);
- this.forwardedFields1.put(sourceField, fs);
+ FieldSet old = this.forwardedFields1.get(sourceField);
+ if (old == null) {
+ old = FieldSet.EMPTY_SET;
}
+
+ FieldSet fs = old.addFields(destinationFields);
+ this.forwardedFields1.put(sourceField, fs);
}
/**
@@ -122,13 +122,13 @@ public class DualInputSemanticProperties extends SemanticProperties {
* @param destinationField the position in the destination record(s)
*/
public void addForwardedField2(int sourceField, int destinationField) {
- FieldSet fs;
- if((fs = this.forwardedFields2.get(sourceField)) != null) {
- fs.add(destinationField);
- } else {
- fs = new FieldSet(destinationField);
- this.forwardedFields2.put(sourceField, fs);
+ FieldSet old = this.forwardedFields2.get(sourceField);
+ if (old == null) {
+ old = FieldSet.EMPTY_SET;
}
+
+ FieldSet fs = old.addField(destinationField);
+ this.forwardedFields2.put(sourceField, fs);
}
/**
@@ -140,13 +140,13 @@ public class DualInputSemanticProperties extends SemanticProperties {
* @param destinationFields the position in the destination record(s)
*/
public void addForwardedField2(int sourceField, FieldSet destinationFields) {
- FieldSet fs;
- if((fs = this.forwardedFields2.get(sourceField)) != null) {
- fs.addAll(destinationFields);
- } else {
- fs = new FieldSet(destinationFields);
- this.forwardedFields2.put(sourceField, fs);
+ FieldSet old = this.forwardedFields2.get(sourceField);
+ if (old == null) {
+ old = FieldSet.EMPTY_SET;
}
+
+ FieldSet fs = old.addFields(destinationFields);
+ this.forwardedFields2.put(sourceField, fs);
}
/**
@@ -179,10 +179,10 @@ public class DualInputSemanticProperties extends SemanticProperties {
* @param readFields the position(s) in the source record(s)
*/
public void addReadFields1(FieldSet readFields) {
- if(this.readFields1 == null) {
- this.readFields1 = new FieldSet(readFields);
+ if (this.readFields1 == null) {
+ this.readFields1 = readFields;
} else {
- this.readFields1.addAll(readFields);
+ this.readFields1 = this.readFields2.addFields(readFields);
}
}
@@ -213,10 +213,10 @@ public class DualInputSemanticProperties extends SemanticProperties {
* @param readFields the position(s) in the source record(s)
*/
public void addReadFields2(FieldSet readFields) {
- if(this.readFields2 == null) {
- this.readFields2 = new FieldSet(readFields);
+ if (this.readFields2 == null) {
+ this.readFields2 = readFields;
} else {
- this.readFields2.addAll(readFields);
+ this.readFields2 = this.readFields2.addFields(readFields);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java
index f63bdb9..a9895ec 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/Ordering.java
@@ -24,7 +24,7 @@ import eu.stratosphere.types.Key;
*/
public class Ordering {
- protected final FieldList indexes = new FieldList();
+ protected FieldList indexes = new FieldList();
protected final ArrayList<Class<? extends Key<?>>> types = new ArrayList<Class<? extends Key<?>>>();
@@ -33,7 +33,7 @@ public class Ordering {
// --------------------------------------------------------------------------------------------
/**
- *
+ * Creates an empty ordering.
*/
public Ordering() {}
@@ -66,7 +66,7 @@ public class Ordering {
throw new IllegalArgumentException("An ordering must not be created with a NONE order.");
}
- this.indexes.add(index);
+ this.indexes = this.indexes.addField(index);
this.types.add(type);
this.orders.add(order);
return this;
@@ -210,10 +210,9 @@ public class Ordering {
- public Ordering clone()
- {
- final Ordering newOrdering = new Ordering();
- newOrdering.indexes.addAll(this.indexes);
+ public Ordering clone() {
+ Ordering newOrdering = new Ordering();
+ newOrdering.indexes = this.indexes;
newOrdering.types.addAll(this.types);
newOrdering.orders.addAll(this.orders);
return newOrdering;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java
index 8c375fc..50af69f 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SemanticProperties.java
@@ -21,6 +21,7 @@ import eu.stratosphere.api.common.operators.util.FieldSet;
* Container for the semantic properties associated to an operator.
*/
public abstract class SemanticProperties implements Serializable {
+
private static final long serialVersionUID = 1L;
/**
@@ -36,9 +37,9 @@ public abstract class SemanticProperties implements Serializable {
*/
public void addWrittenFields(FieldSet writtenFields) {
if(this.writtenFields == null) {
- this.writtenFields = new FieldSet(writtenFields);
+ this.writtenFields = writtenFields;
} else {
- this.writtenFields.addAll(writtenFields);
+ this.writtenFields = this.writtenFields.addFields(writtenFields);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java
index 1a6841d..f1d098a 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/SingleInputSemanticProperties.java
@@ -49,13 +49,13 @@ public class SingleInputSemanticProperties extends SemanticProperties {
* @param destinationField the position in the destination record(s)
*/
public void addForwardedField(int sourceField, int destinationField) {
- FieldSet fs;
- if((fs = this.forwardedFields.get(sourceField)) != null) {
- fs.add(destinationField);
- } else {
- fs = new FieldSet(destinationField);
- this.forwardedFields.put(sourceField, fs);
+ FieldSet old = this.forwardedFields.get(sourceField);
+ if (old == null) {
+ old = FieldSet.EMPTY_SET;
}
+
+ FieldSet fs = old.addField(destinationField);
+ this.forwardedFields.put(sourceField, fs);
}
/**
@@ -67,13 +67,13 @@ public class SingleInputSemanticProperties extends SemanticProperties {
* @param destinationFields the position in the destination record(s)
*/
public void addForwardedField(int sourceField, FieldSet destinationFields) {
- FieldSet fs;
- if((fs = this.forwardedFields.get(sourceField)) != null) {
- fs.addAll(destinationFields);
- } else {
- fs = new FieldSet(destinationFields);
- this.forwardedFields.put(sourceField, fs);
+ FieldSet old = this.forwardedFields.get(sourceField);
+ if (old == null) {
+ old = FieldSet.EMPTY_SET;
}
+
+ FieldSet fs = old.addFields(destinationFields);
+ this.forwardedFields.put(sourceField, fs);
}
/**
@@ -105,10 +105,10 @@ public class SingleInputSemanticProperties extends SemanticProperties {
* @param readFields the position(s) in the source record(s)
*/
public void addReadFields(FieldSet readFields) {
- if(this.readFields == null) {
- this.readFields = new FieldSet(readFields);
+ if (this.readFields == null) {
+ this.readFields = readFields;
} else {
- this.readFields.addAll(readFields);
+ this.readFields = this.readFields.addFields(readFields);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldList.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldList.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldList.java
index 96b0f34..5b7b8da 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldList.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldList.java
@@ -14,42 +14,106 @@
package eu.stratosphere.api.common.operators.util;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import org.apache.commons.lang3.Validate;
/**
- *
+ * Immutable ordered list of fields IDs.
*/
public class FieldList extends FieldSet {
+ public static final FieldList EMPTY_LIST = new FieldList();
+
+
public FieldList() {
- super();
+ super(Collections.<Integer>emptyList());
+ }
+
+ public FieldList(int fieldId) {
+ super(Collections.singletonList(fieldId));
}
- public FieldList(int columnIndex) {
- this();
- add(columnIndex);
+ public FieldList(Integer fieldId) {
+ super(Collections.singletonList(Validate.notNull(fieldId, "The fields ID must not be null.")));
}
public FieldList(int... columnIndexes) {
- this();
- addAll(columnIndexes);
+ super (fromInts(columnIndexes));
+ }
+
+ private FieldList(List<Integer> fields) {
+ super(fields);
}
// --------------------------------------------------------------------------------------------
+ @Override
+ public FieldList addField(Integer fieldID) {
+ if (fieldID == null) {
+ throw new IllegalArgumentException("Field ID must not be null.");
+ }
+
+ if (size() == 0) {
+ return new FieldList(fieldID);
+ } else {
+ ArrayList<Integer> list = new ArrayList<Integer>(size() + 1);
+ list.addAll(this.collection);
+ list.add(fieldID);
+ return new FieldList(Collections.unmodifiableList(list));
+ }
+ }
+
+ @Override
+ public FieldList addFields(int... fieldIDs) {
+ if (fieldIDs == null || fieldIDs.length == 0) {
+ return this;
+ }
+ if (size() == 0) {
+ return new FieldList(fieldIDs);
+ } else {
+ ArrayList<Integer> list = new ArrayList<Integer>(size() + fieldIDs.length);
+ list.addAll(this.collection);
+ for (int i = 0; i < fieldIDs.length; i++) {
+ list.add(fieldIDs[i]);
+ }
+
+ return new FieldList(Collections.unmodifiableList(list));
+ }
+ }
+
+ @Override
+ public FieldList addFields(FieldSet set) {
+ if (set == null) {
+ throw new IllegalArgumentException("FieldSet to add must not be null.");
+ }
+
+ if (set.size() == 0) {
+ return this;
+ }
+ else if (size() == 0 && set instanceof FieldList) {
+ return (FieldList) set;
+ }
+ else {
+ ArrayList<Integer> list = new ArrayList<Integer>(size() + set.size());
+ list.addAll(this.collection);
+ list.addAll(set.collection);
+ return new FieldList(Collections.unmodifiableList(list));
+ }
+ }
+
public Integer get(int pos) {
return get().get(pos);
}
+ @Override
public FieldList toFieldList() {
return this;
}
// --------------------------------------------------------------------------------------------
-
@Override
public boolean isValidSubset(FieldSet set) {
if (set instanceof FieldList) {
@@ -90,11 +154,6 @@ public class FieldList extends FieldSet {
}
// --------------------------------------------------------------------------------------------
-
- @Override
- protected Collection<Integer> initCollection() {
- return new ArrayList<Integer>();
- }
@Override
protected String getDescriptionPrefix() {
@@ -109,4 +168,16 @@ public class FieldList extends FieldSet {
private List<Integer> get() {
return (List<Integer>) this.collection;
}
+
+ private static final List<Integer> fromInts(int... ints) {
+ if (ints == null || ints.length == 0) {
+ return Collections.emptyList();
+ } else {
+ ArrayList<Integer> al = new ArrayList<Integer>(ints.length);
+ for (int i = 0; i < ints.length; i++) {
+ al.add(ints[i]);
+ }
+ return Collections.unmodifiableList(al);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldSet.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldSet.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldSet.java
index 3738319..a1d1b03 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldSet.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/util/FieldSet.java
@@ -15,67 +15,153 @@ package eu.stratosphere.api.common.operators.util;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
/**
- *
+ * Immutable unordered collection of fields IDs.
*/
public class FieldSet implements Iterable<Integer> {
+ public static final FieldSet EMPTY_SET = new FieldSet();
+
protected final Collection<Integer> collection;
// --------------------------------------------------------------------------------------------
+ /**
+ * Creates a new empty set of fields.
+ */
public FieldSet() {
- this.collection = initCollection();
+ this.collection = Collections.emptySet();
}
- public FieldSet(int columnIndex) {
- this();
- add(columnIndex);
+ /**
+ * Creates a set with one field.
+ *
+ * @param fieldID The id of the field.
+ */
+ public FieldSet(Integer fieldID) {
+ if (fieldID == null) {
+ throw new IllegalArgumentException("Field ID must not be null.");
+ }
+
+ this.collection = Collections.singleton(fieldID);
}
- public FieldSet(int[] columnIndexes) {
- this();
- addAll(columnIndexes);
+ /**
+ * Creates a set with the given fields.
+ *
+ * @param fieldIDs The IDs of the fields.
+ */
+ public FieldSet(int... fieldIDs) {
+ if (fieldIDs == null || fieldIDs.length == 0) {
+ this.collection = Collections.emptySet();
+ } else {
+ HashSet<Integer> set = new HashSet<Integer>(2 * fieldIDs.length);
+ for (int i = 0; i < fieldIDs.length; i++) {
+ set.add(fieldIDs[i]);
+ }
+
+ this.collection = Collections.unmodifiableSet(set);
+ }
}
- public FieldSet(Collection<Integer> o) {
- this();
- addAll(o);
+ /**
+ * Creates a set with the given fields.
+ *
+ * @param fieldIDs The IDs of the fields.
+ */
+ public FieldSet(int[] fieldIDs, boolean marker) {
+ if (fieldIDs == null || fieldIDs.length == 0) {
+ this.collection = Collections.emptySet();
+ } else {
+ HashSet<Integer> set = new HashSet<Integer>(2 * fieldIDs.length);
+ for (int i = 0; i < fieldIDs.length; i++) {
+ set.add(fieldIDs[i]);
+ }
+
+ this.collection = Collections.unmodifiableSet(set);
+ }
}
- public FieldSet(Collection<Integer> o1, Collection<Integer> o2) {
- this();
- addAll(o1);
- addAll(o2);
+ protected FieldSet(Collection<Integer> fields) {
+ this.collection = fields;
}
- public FieldSet(FieldSet toCopy) {
- this();
- addAll(toCopy);
+ /**
+ * @param fieldSet The first part of the new set, NOT NULL!
+ * @param fieldID The ID to be added, NOT NULL!
+ */
+ private FieldSet(FieldSet fieldSet, Integer fieldID) {
+ if (fieldSet.size() == 0) {
+ this.collection = Collections.singleton(fieldID);
+ }
+ else {
+ HashSet<Integer> set = new HashSet<Integer>(2 * (fieldSet.collection.size() + 1));
+ set.addAll(fieldSet.collection);
+ set.add(fieldID);
+ this.collection = Collections.unmodifiableSet(set);
+ }
}
- // --------------------------------------------------------------------------------------------
+ private FieldSet(FieldSet fieldSet, int... fieldIDs) {
+ if (fieldIDs == null || fieldIDs.length == 0) {
+ this.collection = fieldSet.collection;
+ } else {
+ HashSet<Integer> set = new HashSet<Integer>(2 * (fieldSet.collection.size() + fieldIDs.length));
+ set.addAll(fieldSet.collection);
+
+ for (int i = 0; i < fieldIDs.length; i++) {
+ set.add(fieldIDs[i]);
+ }
+
+ this.collection = Collections.unmodifiableSet(set);
+ }
+ }
- public void add(Integer columnIndex) {
- this.collection.add(columnIndex);
+ private FieldSet(FieldSet fieldSet1, FieldSet fieldSet2) {
+ if (fieldSet2.size() == 0) {
+ this.collection = fieldSet1.collection;
+ }
+ else if (fieldSet1.size() == 0) {
+ this.collection = fieldSet2.collection;
+ }
+ else {
+ HashSet<Integer> set = new HashSet<Integer>(2 * (fieldSet1.size() + fieldSet2.size()));
+ set.addAll(fieldSet1.collection);
+ set.addAll(fieldSet2.collection);
+ this.collection = Collections.unmodifiableSet(set);
+ }
}
-
- public void addAll(int[] columnIndexes) {
- for (int i = 0; i < columnIndexes.length; i++) {
- add(columnIndexes[i]);
+
+ // --------------------------------------------------------------------------------------------
+
+ public FieldSet addField(Integer fieldID) {
+ if (fieldID == null) {
+ throw new IllegalArgumentException("Field ID must not be null.");
}
+ return new FieldSet(this, fieldID);
}
- public void addAll(Collection<Integer> columnIndexes) {
- this.collection.addAll(columnIndexes);
+ public FieldSet addFields(int... fieldIDs) {
+ return new FieldSet(this, fieldIDs);
}
- public void addAll(FieldSet set) {
- for (Integer i : set) {
- add(i);
+ public FieldSet addFields(FieldSet set) {
+ if (set == null) {
+ throw new IllegalArgumentException("FieldSet to add must not be null.");
+ }
+
+ if (set.size() == 0) {
+ return this;
+ }
+ else if (this.size() == 0) {
+ return set;
+ }
+ else {
+ return new FieldSet(this, set);
}
}
@@ -87,18 +173,28 @@ public class FieldSet implements Iterable<Integer> {
return this.collection.size();
}
-
@Override
public Iterator<Integer> iterator() {
return this.collection.iterator();
}
+ /**
+ * Turns the FieldSet into an ordered FieldList.
+ *
+ * @return An ordered FieldList.
+ */
public FieldList toFieldList() {
int[] pos = toArray();
Arrays.sort(pos);
return new FieldList(pos);
}
+ /**
+ * Transforms the field set into an array of field IDs. Whether the IDs are ordered
+ * or unordered depends on the specific subclass of the field set.
+ *
+ * @return An array of all contained field IDs.
+ */
public int[] toArray() {
int[] a = new int[this.collection.size()];
int i = 0;
@@ -134,13 +230,11 @@ public class FieldSet implements Iterable<Integer> {
// --------------------------------------------------------------------------------------------
-
@Override
public int hashCode() {
return this.collection.hashCode();
}
-
@Override
public boolean equals(Object obj) {
if (obj == null) {
@@ -171,18 +265,18 @@ public class FieldSet implements Iterable<Integer> {
}
+ /**
+ * Since instances of FieldSet are strictly immutable, this method does not actually clone,
+ * but it only returns the original instance.
+ *
+ * @return This objects reference, unmodified.
+ */
public FieldSet clone() {
- FieldSet set = new FieldSet();
- set.addAll(this.collection);
- return set;
+ return this;
}
// --------------------------------------------------------------------------------------------
- protected Collection<Integer> initCollection() {
- return new HashSet<Integer>();
- }
-
protected String getDescriptionPrefix() {
return "(";
}
@@ -190,6 +284,4 @@ public class FieldSet implements Iterable<Integer> {
protected String getDescriptionSuffix() {
return ")";
}
-
- public static final FieldSet EMPTY_SET = new FieldSet();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldListTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldListTest.java b/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldListTest.java
new file mode 100644
index 0000000..fe772c6
--- /dev/null
+++ b/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldListTest.java
@@ -0,0 +1,105 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.common.operators.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.junit.Test;
+
+public class FieldListTest {
+
+ @Test
+ public void testFieldListConstructors() {
+ check(new FieldList());
+ check(FieldList.EMPTY_LIST);
+ check(new FieldList(14), 14);
+ check(new FieldList(new Integer(3)), 3);
+ check(new FieldList(7, 4, 1), 7, 4, 1);
+ check(new FieldList(7, 4, 1, 4, 7, 1, 4, 2), 7, 4, 1, 4, 7, 1, 4, 2);
+ }
+
+ @Test
+ public void testFieldListAdds() {
+ check(new FieldList().addField(1).addField(2), 1, 2);
+ check(FieldList.EMPTY_LIST.addField(3).addField(2), 3, 2);
+ check(new FieldList(13).addFields(new FieldList(17, 31, 42)), 13, 17, 31, 42);
+ check(new FieldList(14).addFields(new FieldList(17)), 14, 17);
+ check(new FieldList(3).addFields(2, 8, 5, 7), 3, 2, 8, 5, 7);
+ }
+
+ @Test
+ public void testImmutability() {
+ FieldList s1 = new FieldList();
+ FieldList s2 = new FieldList(5);
+ FieldList s3 = new FieldList(new Integer(7));
+ FieldList s4 = new FieldList(5, 4, 7, 6);
+
+ s1.addFields(s2).addFields(s3);
+ s2.addFields(s4);
+ s4.addFields(s1);
+
+ s1.addField(new Integer(14));
+ s2.addFields(78, 13, 66, 3);
+
+ assertEquals(0, s1.size());
+ assertEquals(1, s2.size());
+ assertEquals(1, s3.size());
+ assertEquals(4, s4.size());
+ }
+
+ @Test
+ public void testAddSetToList() {
+ check(new FieldList().addFields(new FieldSet(1)).addFields(2), 1, 2);
+ check(new FieldList().addFields(1).addFields(new FieldSet(2)), 1, 2);
+ check(new FieldList().addFields(new FieldSet(2)), 2);
+ }
+
+ private static void check(FieldList set, int... elements) {
+ if (elements == null) {
+ assertEquals(0, set.size());
+ return;
+ }
+
+ assertEquals(elements.length, set.size());
+
+ // test contains
+ for (int i : elements) {
+ set.contains(i);
+ }
+
+ // test to array
+ {
+ int[] arr = set.toArray();
+ assertTrue(Arrays.equals(arr, elements));
+ }
+
+ {
+ int[] fromIter = new int[set.size()];
+ Iterator<Integer> iter = set.iterator();
+
+ for (int i = 0; i < fromIter.length; i++) {
+ fromIter[i] = iter.next();
+ }
+ assertFalse(iter.hasNext());
+ assertTrue(Arrays.equals(fromIter, elements));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldSetTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldSetTest.java b/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldSetTest.java
new file mode 100644
index 0000000..1a147fc
--- /dev/null
+++ b/stratosphere-core/src/test/java/eu/stratosphere/api/common/operators/util/FieldSetTest.java
@@ -0,0 +1,110 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.api.common.operators.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.junit.Test;
+
+public class FieldSetTest {
+
+ @Test
+ public void testFieldSetConstructors() {
+ check(new FieldSet());
+ check(FieldSet.EMPTY_SET);
+ check(new FieldSet(14), 14);
+ check(new FieldSet(new Integer(3)), 3);
+ check(new FieldSet(7, 4, 1), 1, 4, 7);
+ check(new FieldSet(7, 4, 1, 4, 7, 1, 4, 2), 1, 4, 2, 7);
+ }
+
+ @Test
+ public void testFieldSetAdds() {
+ check(new FieldSet().addField(1).addField(2), 1, 2);
+ check(FieldSet.EMPTY_SET.addField(3).addField(2), 3, 2);
+ check(new FieldSet(13).addFields(new FieldSet(17, 31, 42)), 17, 13, 42, 31);
+ check(new FieldSet(14).addFields(new FieldSet(17)), 17, 14);
+ check(new FieldSet(3).addFields(2, 8, 5, 7), 3, 2, 8, 5, 7);
+ check(new FieldSet().addFields(new FieldSet()));
+ check(new FieldSet().addFields(new FieldSet(3, 4)), 4, 3);
+ check(new FieldSet(5, 1).addFields(new FieldSet()), 5, 1);
+ }
+
+ @Test
+ public void testImmutability() {
+ FieldSet s1 = new FieldSet();
+ FieldSet s2 = new FieldSet(5);
+ FieldSet s3 = new FieldSet(new Integer(7));
+ FieldSet s4 = new FieldSet(5, 4, 7, 6);
+
+ s1.addFields(s2).addFields(s3);
+ s2.addFields(s4);
+ s4.addFields(s1);
+
+ s1.addField(new Integer(14));
+ s2.addFields(78, 13, 66, 3);
+
+ assertEquals(0, s1.size());
+ assertEquals(1, s2.size());
+ assertEquals(1, s3.size());
+ assertEquals(4, s4.size());
+ }
+
+ @Test
+ public void testAddListToSet() {
+ check(new FieldSet().addField(1).addFields(new FieldList(14, 3, 1)), 1, 3, 14);
+ }
+
+ private static void check(FieldSet set, int... elements) {
+ if (elements == null) {
+ assertEquals(0, set.size());
+ return;
+ }
+
+ assertEquals(elements.length, set.size());
+
+ // test contains
+ for (int i : elements) {
+ set.contains(i);
+ }
+
+ Arrays.sort(elements);
+
+ // test to array
+ {
+ int[] arr = set.toArray();
+ Arrays.sort(arr);
+ assertTrue(Arrays.equals(arr, elements));
+ }
+
+ {
+ int[] fromIter = new int[set.size()];
+ Iterator<Integer> iter = set.iterator();
+
+ for (int i = 0; i < fromIter.length; i++) {
+ fromIter[i] = iter.next();
+ }
+ assertFalse(iter.hasNext());
+ Arrays.sort(fromIter);
+ assertTrue(Arrays.equals(fromIter, elements));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
index 74b1871..4d767b0 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
@@ -294,14 +294,14 @@ public class SemanticPropUtil {
}
matcher = PATTERN_DIGIT.matcher(s);
- FieldSet fs = new FieldSet();
+ FieldSet fs = FieldSet.EMPTY_SET;
while (matcher.find()) {
int field = Integer.valueOf(matcher.group());
if (!isValidField(outType, field) || !isValidField(inType, field)) {
throw new IndexOutOfBoundsException("Annotation: Field " + field + " not available in the output tuple.");
}
- fs.add(field);
+ fs = fs.addField(field);
}
return fs;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/202aa4a7/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala
----------------------------------------------------------------------
diff --git a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala
index b9f7230..af6f2d6 100644
--- a/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala
+++ b/stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/CompilerHints.scala
@@ -25,27 +25,18 @@ import eu.stratosphere.types.Record
case class KeyCardinality(key: FieldSelector, isUnique: Boolean, distinctCount: Option[Long], avgNumRecords: Option[Float]) {
- private class RefreshableFieldSet extends PactFieldSet {
- def refresh(indexes: Set[Int]) = {
- this.collection.clear()
- for (index <- indexes)
- this.add(index)
- }
- }
-
- @transient private var pactFieldSets = collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], RefreshableFieldSet]()
+ @transient private var pactFieldSets = collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
def getPactFieldSet(contract: Operator[Record] with ScalaOperator[_, _]): PactFieldSet = {
if (pactFieldSets == null)
- pactFieldSets = collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], RefreshableFieldSet]()
+ pactFieldSets = collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
val keyCopy = key.copy
contract.getUDF.attachOutputsToInputs(keyCopy.inputFields)
- val keySet = keyCopy.selectedFields.toIndexSet
+ val keySet = keyCopy.selectedFields.toIndexSet.toArray
- val fieldSet = pactFieldSets.getOrElseUpdate(contract, new RefreshableFieldSet())
- fieldSet.refresh(keySet)
+ val fieldSet = pactFieldSets.getOrElseUpdate(contract, new PactFieldSet(keySet, true))
fieldSet
}
}
[6/6] git commit: [FLINK-935] Fix compiler logic that pushed work out
of the iteration loop.
Posted by se...@apache.org.
[FLINK-935] Fix compiler logic that pushed work out of the iteration loop.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/31a37393
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/31a37393
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/31a37393
Branch: refs/heads/master
Commit: 31a373930bce27a0d4abc2cbcff4284d5af9002a
Parents: 229754d
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jun 13 19:56:26 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200
----------------------------------------------------------------------
.../compiler/costs/CostEstimator.java | 2 -
.../compiler/dag/BulkIterationNode.java | 12 +-
.../compiler/dag/BulkPartialSolutionNode.java | 2 +-
.../stratosphere/compiler/dag/DataSinkNode.java | 8 +-
.../compiler/dag/SingleInputNode.java | 11 +-
.../stratosphere/compiler/dag/TwoInputNode.java | 16 +-
.../compiler/dag/WorksetIterationNode.java | 2 +-
.../RequestedGlobalProperties.java | 46 +---
.../RequestedLocalProperties.java | 14 +-
.../eu/stratosphere/compiler/plan/Channel.java | 76 +++---
.../plandump/PlanJSONDumpGenerator.java | 3 -
.../plantranslate/NepheleJobGraphGenerator.java | 1 -
.../pact/compiler/DOPChangeTest.java | 4 +-
.../pact/compiler/IterationsCompilerTest.java | 265 +++++++++++++++++++
.../MultipleIterationsCompilerTest.java | 221 ----------------
.../WorksetEmptyConvergenceCriterion.java | 2 +
.../pact/runtime/shipping/OutputEmitter.java | 2 -
.../runtime/shipping/RecordOutputEmitter.java | 2 -
.../pact/runtime/shipping/ShipStrategyType.java | 6 -
.../test/util/testjar/KMeansForTest.java | 2 +-
20 files changed, 345 insertions(+), 352 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
index 492204b..11fb45b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
@@ -94,8 +94,6 @@ public abstract class CostEstimator {
case FORWARD:
// costs.addHeuristicNetworkCost(channel.getMaxDepth());
break;
- case PARTITION_LOCAL_HASH:
- break;
case PARTITION_RANDOM:
addRandomPartitioningCost(channel, costs);
break;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
index c547da9..b60f427 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
@@ -279,13 +279,17 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
// 3) Get the alternative plans
List<PlanNode> candidates = this.nextPartialSolution.getAlternativePlans(estimator);
- // 4) Throw away all that are not compatible with the properties currently requested to the
- // initial partial solution
+ // 4) Make sure that the beginning of the step function does not assume properties that
+ // are not also produced by the end of the step function.
+
for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
PlanNode candidate = planDeleter.next();
- if (!(globPropsReq.isMetBy(candidate.getGlobalProperties()) && locPropsReq.isMetBy(candidate.getLocalProperties()))) {
- planDeleter.remove();
+
+ // quick-check if the properties at the end of the step function are the same as at the beginning
+ if (candidate.getGlobalProperties().equals(pspn.getGlobalProperties()) && candidate.getLocalProperties().equals(pspn.getLocalProperties())) {
+ continue;
}
+ planDeleter.remove();
}
// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
index 449a2fc..592768b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkPartialSolutionNode.java
@@ -43,7 +43,7 @@ public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
if (this.cachedPlans != null) {
throw new IllegalStateException();
} else {
- this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, "BulkPartialSolution("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
+ this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, "PartialSolution ("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
index 3f7b224..15c7670 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
@@ -31,7 +31,6 @@ import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
import eu.stratosphere.compiler.plan.Channel;
import eu.stratosphere.compiler.plan.PlanNode;
import eu.stratosphere.compiler.plan.SinkPlanNode;
-import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.util.Visitor;
/**
@@ -210,12 +209,7 @@ public class DataSinkNode extends OptimizerNode {
for (RequestedLocalProperties lp : ips.getLocalProperties()) {
Channel c = new Channel(p);
gp.parameterizeChannel(c, globalDopChange, localDopChange);
-
- if (lp.isMetBy(c.getLocalPropertiesAfterShippingOnly())) {
- c.setLocalStrategy(LocalStrategy.NONE);
- } else {
- lp.parameterizeChannel(c);
- }
+ lp.parameterizeChannel(c);
// no need to check whether the created properties meet what we need in case
// of ordering or global ordering, because the only interesting properties we have
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
index 2a9e5c6..fd4b76a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
@@ -42,7 +42,6 @@ import eu.stratosphere.compiler.plan.SingleInputPlanNode;
import eu.stratosphere.compiler.util.NoOpUnaryUdfOp;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
-import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.util.Visitor;
/**
@@ -357,21 +356,17 @@ public abstract class SingleInputNode extends OptimizerNode {
protected void addLocalCandidates(Channel template, List<Set<? extends NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps,
List<PlanNode> target, CostEstimator estimator)
{
- final LocalProperties lp = template.getLocalPropertiesAfterShippingOnly();
for (RequestedLocalProperties ilp : this.inConn.getInterestingProperties().getLocalProperties()) {
final Channel in = template.clone();
- if (ilp.isMetBy(lp)) {
- in.setLocalStrategy(LocalStrategy.NONE);
- } else {
- ilp.parameterizeChannel(in);
- }
+ ilp.parameterizeChannel(in);
// instantiate a candidate, if the instantiated local properties meet one possible local property set
+ outer:
for (OperatorDescriptorSingle dps: getPossibleProperties()) {
for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
if (ilps.isMetBy(in.getLocalProperties())) {
instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
- break;
+ break outer;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
index 5046ceb..cccbeba 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
@@ -51,7 +51,6 @@ import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.DamBehavior;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
-import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.util.Visitor;
/**
@@ -488,24 +487,13 @@ public abstract class TwoInputNode extends OptimizerNode {
RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2,
List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator)
{
- final LocalProperties lp1 = template1.getLocalPropertiesAfterShippingOnly();
- final LocalProperties lp2 = template2.getLocalPropertiesAfterShippingOnly();
-
for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) {
final Channel in1 = template1.clone();
- if (ilp1.isMetBy(lp1)) {
- in1.setLocalStrategy(LocalStrategy.NONE);
- } else {
- ilp1.parameterizeChannel(in1);
- }
+ ilp1.parameterizeChannel(in1);
for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) {
final Channel in2 = template2.clone();
- if (ilp2.isMetBy(lp2)) {
- in2.setLocalStrategy(LocalStrategy.NONE);
- } else {
- ilp2.parameterizeChannel(in2);
- }
+ ilp2.parameterizeChannel(in2);
allPossibleLoop:
for (OperatorDescriptorDual dps: this.possibleProperties) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
index be02e01..94580ad 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
@@ -397,7 +397,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
List<UnclosedBranchDescriptor> result2 = getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection());
ArrayList<UnclosedBranchDescriptor> inputsMerged1 = new ArrayList<UnclosedBranchDescriptor>();
- mergeLists(result1, result2, inputsMerged1);
+ mergeLists(result1, result2, inputsMerged1); // this method also sets which branches are joined here (in the head)
addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
index 06d7371..3935f5e 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
@@ -17,8 +17,6 @@ import eu.stratosphere.api.common.distributions.DataDistribution;
import eu.stratosphere.api.common.operators.Ordering;
import eu.stratosphere.api.common.operators.util.FieldSet;
import eu.stratosphere.compiler.CompilerException;
-import eu.stratosphere.compiler.costs.CostEstimator;
-import eu.stratosphere.compiler.costs.Costs;
import eu.stratosphere.compiler.dag.OptimizerNode;
import eu.stratosphere.compiler.plan.Channel;
import eu.stratosphere.compiler.util.Utils;
@@ -31,8 +29,8 @@ import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
* Currently, the properties are the following: A partitioning type (ANY, HASH, RANGE), and EITHER an ordering (for range partitioning)
* or an FieldSet with the hash partitioning columns.
*/
-public final class RequestedGlobalProperties implements Cloneable
-{
+public final class RequestedGlobalProperties implements Cloneable {
+
private PartitioningProperty partitioning; // the type partitioning
private FieldSet partitioningFields; // the fields which are partitioned
@@ -230,20 +228,12 @@ public final class RequestedGlobalProperties implements Cloneable
final GlobalProperties inGlobals = channel.getSource().getGlobalProperties();
// if we have no global parallelism change, check if we have already compatible global properties
- if (!globalDopChange && isMetBy(inGlobals)) {
- if (localDopChange) {
- // if the local degree of parallelism changes, we need to adjust
- if (inGlobals.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
- // to preserve the hash partitioning, we need to locally hash re-partition
- channel.setShipStrategy(ShipStrategyType.PARTITION_LOCAL_HASH, inGlobals.getPartitioningFields());
- return;
- }
- // else fall though
- } else {
- // we meet already everything, so go forward
- channel.setShipStrategy(ShipStrategyType.FORWARD);
- return;
- }
+ if (!globalDopChange && !localDopChange && isMetBy(inGlobals)) {
+ channel.setRequiredGlobalProps(this);
+
+ // we meet already everything, so go forward
+ channel.setShipStrategy(ShipStrategyType.FORWARD);
+ return;
}
// if we fall through the conditions until here, we need to re-establish
@@ -266,26 +256,6 @@ public final class RequestedGlobalProperties implements Cloneable
throw new CompilerException();
}
}
-
- public void addMinimalRequiredCosts(Costs to, CostEstimator estimator, OptimizerNode source, OptimizerNode target) {
- if (this.partitioning == null || this.partitioning == PartitioningProperty.RANDOM) {
- return;
- } else {
- switch (this.partitioning) {
- case FULL_REPLICATION:
- estimator.addBroadcastCost(source, target.getDegreeOfParallelism(), to);
- case ANY_PARTITIONING:
- case HASH_PARTITIONED:
- estimator.addHashPartitioningCost(source, to);
- break;
- case RANGE_PARTITIONED:
- estimator.addRangePartitionCost(source, to);
- break;
- default:
- throw new CompilerException();
- }
- }
- }
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
index 7e67f5f..e714cea 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
@@ -187,13 +187,23 @@ public class RequestedLocalProperties implements Cloneable {
* @param channel The channel to parameterize.
*/
public void parameterizeChannel(Channel channel) {
- if (this.ordering != null) {
+ LocalProperties current = channel.getLocalProperties();
+
+ if (isMetBy(current)) {
+ // we are met. record that this is needed.
+ channel.setRequiredLocalProps(this);
+ }
+ else if (this.ordering != null) {
channel.setLocalStrategy(LocalStrategy.SORT, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
- } else if (this.groupedFields != null) {
+ }
+ else if (this.groupedFields != null) {
boolean[] dirs = new boolean[this.groupedFields.size()];
Arrays.fill(dirs, true);
channel.setLocalStrategy(LocalStrategy.SORT, Utils.createOrderedFromSet(this.groupedFields), dirs);
}
+ else {
+ channel.setLocalStrategy(LocalStrategy.NONE);
+ }
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
index 3377caf..1cd6a36 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
@@ -22,6 +22,8 @@ import eu.stratosphere.compiler.dag.EstimateProvider;
import eu.stratosphere.compiler.dag.TempMode;
import eu.stratosphere.compiler.dataproperties.GlobalProperties;
import eu.stratosphere.compiler.dataproperties.LocalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedGlobalProperties;
+import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
import eu.stratosphere.compiler.plandump.DumpableConnection;
import eu.stratosphere.compiler.util.Utils;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
@@ -48,6 +50,10 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
private boolean[] localSortOrder;
+ private RequestedGlobalProperties requiredGlobalProps;
+
+ private RequestedLocalProperties requiredLocalProps;
+
private GlobalProperties globalProps;
private LocalProperties localProps;
@@ -329,6 +335,22 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
// --------------------------------------------------------------------------------------------
+ public RequestedGlobalProperties getRequiredGlobalProps() {
+ return requiredGlobalProps;
+ }
+
+ public void setRequiredGlobalProps(RequestedGlobalProperties requiredGlobalProps) {
+ this.requiredGlobalProps = requiredGlobalProps;
+ }
+
+ public RequestedLocalProperties getRequiredLocalProps() {
+ return requiredLocalProps;
+ }
+
+ public void setRequiredLocalProps(RequestedLocalProperties requiredLocalProps) {
+ this.requiredLocalProps = requiredLocalProps;
+ }
+
public GlobalProperties getGlobalProperties() {
if (this.globalProps == null) {
this.globalProps = this.source.getGlobalProperties().clone();
@@ -348,19 +370,6 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
case PARTITION_RANDOM:
this.globalProps.reset();
break;
- case PARTITION_LOCAL_HASH:
- if (getSource().getGlobalProperties().isPartitionedOnFields(this.shipKeys)) {
- // after a local hash partitioning, we can only state that the data is somehow
- // partitioned. even if we had a hash partitioning before over 8 partitions,
- // locally rehashing that onto 16 partitions (each one partition into two) gives you
- // a different result than directly hashing to 16 partitions. the hash-partitioning
- // property is only valid, if the assumed built in hash function is directly used.
- // hence, we can only state that this is some form of partitioning.
- this.globalProps.setAnyPartitioning(this.shipKeys);
- } else {
- this.globalProps.reset();
- }
- break;
case NONE:
throw new CompilerException("Cannot produce GlobalProperties before ship strategy is set.");
}
@@ -371,12 +380,13 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
public LocalProperties getLocalProperties() {
if (this.localProps == null) {
- this.localProps = getLocalPropertiesAfterShippingOnly().clone();
+ computeLocalPropertiesAfterShippingOnly();
switch (this.localStrategy) {
case NONE:
break;
case SORT:
case COMBININGSORT:
+ this.localProps = new LocalProperties();
this.localProps.setOrdering(Utils.createOrdering(this.localKeys, this.localSortOrder));
break;
default:
@@ -387,25 +397,19 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
return this.localProps;
}
- public LocalProperties getLocalPropertiesAfterShippingOnly() {
- if (this.shipStrategy == ShipStrategyType.FORWARD) {
- return this.source.getLocalProperties();
- } else {
- final LocalProperties props = this.source.getLocalProperties().clone();
- switch (this.shipStrategy) {
- case BROADCAST:
- case PARTITION_HASH:
- case PARTITION_RANGE:
- case PARTITION_RANDOM:
- props.reset();
- break;
- case PARTITION_LOCAL_HASH:
- case FORWARD:
- break;
- case NONE:
- throw new CompilerException("ShipStrategy has not yet been set.");
- }
- return props;
+ private void computeLocalPropertiesAfterShippingOnly() {
+ switch (this.shipStrategy) {
+ case BROADCAST:
+ case PARTITION_HASH:
+ case PARTITION_RANGE:
+ case PARTITION_RANDOM:
+ this.localProps = new LocalProperties();
+ break;
+ case FORWARD:
+ this.localProps = this.source.getLocalProperties();
+ break;
+ case NONE:
+ throw new CompilerException("ShipStrategy has not yet been set.");
}
}
@@ -423,8 +427,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
// some strategies globally reestablish properties
switch (this.shipStrategy) {
case FORWARD:
- case PARTITION_LOCAL_HASH:
- throw new CompilerException("Cannot use FORWARD or LOCAL_HASH strategy between operations " +
+ throw new CompilerException("Cannot use FORWARD strategy between operations " +
"with different number of parallel instances.");
case NONE: // excluded by sanity check. lust here for verification check completion
case BROADCAST:
@@ -452,8 +455,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
case FORWARD:
this.globalProps.reset();
return;
- case NONE: // excluded by sanity check. lust here for verification check completion
- case PARTITION_LOCAL_HASH:
+ case NONE: // excluded by sanity check. just here to silence compiler warnings check completion
case BROADCAST:
case PARTITION_HASH:
case PARTITION_RANGE:
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
index 183ee2f..e89a18b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
@@ -319,9 +319,6 @@ public class PlanJSONDumpGenerator {
case PARTITION_RANGE:
shipStrategy = "Range Partition";
break;
- case PARTITION_LOCAL_HASH:
- shipStrategy = "Hash Partition (local)";
- break;
case PARTITION_RANDOM:
shipStrategy = "Redistribute";
break;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index 4817095..aebf2cf 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -1041,7 +1041,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
switch (channel.getShipStrategy()) {
case FORWARD:
- case PARTITION_LOCAL_HASH:
distributionPattern = DistributionPattern.POINTWISE;
channelType = ChannelType.NETWORK;
break;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
index 9f53c30..273c42c 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
@@ -209,8 +209,8 @@ public class DOPChangeTest extends CompilerTestBase {
ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_LOCAL_HASH, mapIn);
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, reduceIn);
+ Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
+ Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
new file mode 100644
index 0000000..a123099
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
@@ -0,0 +1,265 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.compiler;
+
+import static org.junit.Assert.*;
+
+import java.util.Iterator;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.DeltaIteration;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.IterativeDataSet;
+import eu.stratosphere.api.java.aggregation.Aggregations;
+import eu.stratosphere.api.java.functions.FlatMapFunction;
+import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields;
+import eu.stratosphere.api.java.functions.GroupReduceFunction;
+import eu.stratosphere.api.java.functions.JoinFunction;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.compiler.plan.BulkIterationPlanNode;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.OptimizedPlan;
+import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
+import eu.stratosphere.compiler.plandump.PlanJSONDumpGenerator;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
+import eu.stratosphere.util.Collector;
+
+
+@SuppressWarnings({"serial", "unchecked"})
+public class IterationsCompilerTest extends CompilerTestBase {
+
+ @Test
+ public void testTwoIterationsWithMapperInbetween() throws Exception {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
+
+ DataSet<Tuple2<Long, Long>> mappedBulk = bulkResult.map(new DummyMap());
+
+ DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges);
+
+ depResult.print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ assertEquals(1, op.getDataSinks().size());
+ assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
+
+ WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+
+ assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
+ assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTwoIterationsDirectlyChained() throws Exception {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
+
+ DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges);
+
+ depResult.print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ assertEquals(1, op.getDataSinks().size());
+ assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
+
+ WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+
+ assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
+ assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTwoWorksetIterationsDirectlyChained() throws Exception {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> firstResult = doDeltaIteration(verticesWithInitialId, edges);
+
+ DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges);
+
+ secondResult.print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ assertEquals(1, op.getDataSinks().size());
+ assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
+
+ WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+
+ assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
+ assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testIterationPushingWorkOut() throws Exception {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
+
+ DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);
+
+ doBulkIteration(input1, input2).print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ System.out.println(new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op));
+
+ assertEquals(1, op.getDataSinks().size());
+ assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof BulkIterationPlanNode);
+
+ BulkIterationPlanNode bipn = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+
+ for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
+ assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
+ }
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
+
+ // open a bulk iteration
+ IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(100);
+
+ DataSet<Tuple2<Long, Long>> changes = iteration
+ .join(edges).where(0).equalTo(0).with(new Join222())
+ .groupBy(0).aggregate(Aggregations.MIN, 1)
+ .join(iteration).where(0).equalTo(0)
+ .flatMap(new FlatMapJoin());
+
+ // close the bulk iteration
+ return iteration.closeWith(changes);
+ }
+
+
+ public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
+
+ DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
+
+ DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
+ .projectSecond(1).types(Long.class);
+
+ DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
+
+ DataSet<Tuple2<Long, Long>> candidatesDependencies =
+ grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1).types(Long.class, Long.class);
+
+ DataSet<Tuple2<Long, Long>> verticesWithNewComponents =
+ candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
+ .with(new Join222())
+ .groupBy(0).aggregate(Aggregations.MIN, 1);
+
+ DataSet<Tuple2<Long, Long>> updatedComponentId =
+ verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
+ .flatMap(new FlatMapJoin());
+
+ DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
+
+ return depResult;
+
+ }
+
+ public static final class Join222 extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+ @Override
+ public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
+ return null;
+ }
+ }
+
+ public static final class FlatMapJoin extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+
+ @Override
+ public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {}
+ }
+
+ public static final class DummyMap extends MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+ @Override
+ public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+ return value;
+ }
+ }
+
+ @ConstantFields("0")
+ public static final class Reduce101 extends GroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
+
+ @Override
+ public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
+ }
+
+ @ConstantFields("0")
+ public static final class DuplicateValue extends MapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
+
+ @Override
+ public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception {
+ return new Tuple2<Long, Long>(value.f0, value.f0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java
deleted file mode 100644
index 5cf1425..0000000
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/MultipleIterationsCompilerTest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-package eu.stratosphere.pact.compiler;
-
-import static org.junit.Assert.*;
-
-import java.util.Iterator;
-
-import org.junit.Test;
-
-import eu.stratosphere.api.common.Plan;
-import eu.stratosphere.api.java.DataSet;
-import eu.stratosphere.api.java.DeltaIteration;
-import eu.stratosphere.api.java.ExecutionEnvironment;
-import eu.stratosphere.api.java.IterativeDataSet;
-import eu.stratosphere.api.java.aggregation.Aggregations;
-import eu.stratosphere.api.java.functions.FlatMapFunction;
-import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields;
-import eu.stratosphere.api.java.functions.GroupReduceFunction;
-import eu.stratosphere.api.java.functions.JoinFunction;
-import eu.stratosphere.api.java.functions.MapFunction;
-import eu.stratosphere.api.java.tuple.Tuple1;
-import eu.stratosphere.api.java.tuple.Tuple2;
-import eu.stratosphere.compiler.plan.OptimizedPlan;
-import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
-import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
-import eu.stratosphere.util.Collector;
-
-
-@SuppressWarnings({"serial", "unchecked"})
-public class MultipleIterationsCompilerTest extends CompilerTestBase {
-
- @Test
- public void testTwoIterationsWithMapperInbetween() throws Exception {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
-
- DataSet<Tuple2<Long, Long>> mappedBulk = bulkResult.map(new DummyMap());
-
- DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges);
-
- depResult.print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- assertEquals(1, op.getDataSinks().size());
- assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-
- WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-
- assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
- assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTwoIterationsDirectlyChained() throws Exception {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> bulkResult = doBulkIteration(verticesWithInitialId, edges);
-
- DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges);
-
- depResult.print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- assertEquals(1, op.getDataSinks().size());
- assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-
- WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-
- assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy());
- assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTwoWorksetIterationsDirectlyChained() throws Exception {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(8);
-
- DataSet<Tuple2<Long, Long>> verticesWithInitialId = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
-
- DataSet<Tuple2<Long, Long>> firstResult = doDeltaIteration(verticesWithInitialId, edges);
-
- DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges);
-
- secondResult.print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- assertEquals(1, op.getDataSinks().size());
- assertTrue(op.getDataSinks().iterator().next().getInput().getSource() instanceof WorksetIterationPlanNode);
-
- WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
-
- assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy());
- assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
-
- // open a bulk iteration
- IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(100);
-
- DataSet<Tuple2<Long, Long>> changes = iteration
- .join(edges).where(0).equalTo(0).with(new Join222())
- .groupBy(0).aggregate(Aggregations.MIN, 1)
- .join(iteration).where(0).equalTo(0)
- .flatMap(new FlatMapJoin());
-
- // close the bulk iteration
- return iteration.closeWith(changes);
- }
-
-
- public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
-
- DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
-
- DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
- .projectSecond(1).types(Long.class);
-
- DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
-
- DataSet<Tuple2<Long, Long>> candidatesDependencies =
- grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1).types(Long.class, Long.class);
-
- DataSet<Tuple2<Long, Long>> verticesWithNewComponents =
- candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
- .with(new Join222())
- .groupBy(0).aggregate(Aggregations.MIN, 1);
-
- DataSet<Tuple2<Long, Long>> updatedComponentId =
- verticesWithNewComponents.join(depIteration.getSolutionSet()).where(0).equalTo(0)
- .flatMap(new FlatMapJoin());
-
- DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
-
- return depResult;
-
- }
-
- public static final class Join222 extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
- @Override
- public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
- return null;
- }
- }
-
- public static final class FlatMapJoin extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
-
- @Override
- public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {}
- }
-
- public static final class DummyMap extends MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
- @Override
- public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
- return value;
- }
- }
-
- @ConstantFields("0")
- public static final class Reduce101 extends GroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
-
- @Override
- public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
index bb8af9c..c9fd442 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
@@ -24,6 +24,8 @@ import eu.stratosphere.types.LongValue;
*/
public class WorksetEmptyConvergenceCriterion implements ConvergenceCriterion<LongValue> {
+ private static final long serialVersionUID = 1L;
+
private static final Log log = LogFactory.getLog(WorksetEmptyConvergenceCriterion.class);
public static final String AGGREGATOR_NAME = "pact.runtime.workset-empty-aggregator";
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
index 6491749..638dd1d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
@@ -78,7 +78,6 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
switch (strategy) {
case FORWARD:
case PARTITION_HASH:
- case PARTITION_LOCAL_HASH:
case PARTITION_RANGE:
case PARTITION_RANDOM:
case BROADCAST:
@@ -103,7 +102,6 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
case PARTITION_RANDOM:
return robin(numberOfChannels);
case PARTITION_HASH:
- case PARTITION_LOCAL_HASH:
return hashPartitionDefault(record.getInstance(), numberOfChannels);
case PARTITION_RANGE:
return rangePartition(record.getInstance(), numberOfChannels);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
index 7fe35b4..efdb267 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
@@ -83,7 +83,6 @@ public class RecordOutputEmitter implements ChannelSelector<Record> {
switch (strategy) {
case FORWARD:
case PARTITION_HASH:
- case PARTITION_LOCAL_HASH:
case PARTITION_RANGE:
case PARTITION_RANDOM:
this.channels = new int[1];
@@ -110,7 +109,6 @@ public class RecordOutputEmitter implements ChannelSelector<Record> {
case PARTITION_RANDOM:
return robin(numberOfChannels);
case PARTITION_HASH:
- case PARTITION_LOCAL_HASH:
return hashPartitionDefault(record, numberOfChannels);
case PARTITION_RANGE:
return rangePartition(record, numberOfChannels);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
index 2b50779..a86c209 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/ShipStrategyType.java
@@ -40,12 +40,6 @@ public enum ShipStrategyType {
PARTITION_HASH(true, true, true),
/**
- * Repartitioning the data within a local instance with a hash function. Happens for example when the
- * intra-node degree-of-parallelism is increased.
- */
- PARTITION_LOCAL_HASH(false, true, true),
-
- /**
* Partitioning the data in ranges according to a total order.
*/
PARTITION_RANGE(true, true, true),
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31a37393/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
index c13ddc8..d1b249a 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
@@ -45,7 +45,7 @@ public class KMeansForTest implements Program{
}
// set up execution environment
- ExecutionEnvironment env = new RemoteEnvironment("localhost", 1, null);
+ ExecutionEnvironment env = new RemoteEnvironment("localhost", 1);
// get input data
DataSet<Point> points = getPointDataSet(env);
[2/6] git commit: [FLINK-935] (continued) Correct feedback property
check for iterative algorithms.
Posted by se...@apache.org.
[FLINK-935] (continued) Correct feedback property check for iterative algorithms.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ca2b287a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ca2b287a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ca2b287a
Branch: refs/heads/master
Commit: ca2b287a7a78328ebf43766b9fdf39b56fb5fd4f
Parents: ed7c3f1
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jun 16 15:05:10 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200
----------------------------------------------------------------------
.../api/avro/AvroOutputFormatTest.java | 1 +
.../compiler/dag/BulkIterationNode.java | 61 +-
.../stratosphere/compiler/dag/DataSinkNode.java | 2 +
.../compiler/dag/SingleInputNode.java | 2 +
.../stratosphere/compiler/dag/TwoInputNode.java | 17 +-
.../compiler/dag/WorksetIterationNode.java | 55 +-
.../dataproperties/LocalProperties.java | 2 +-
.../RequestedGlobalProperties.java | 2 -
.../RequestedLocalProperties.java | 8 +-
.../eu/stratosphere/compiler/plan/PlanNode.java | 103 +-
.../compiler/CoGroupSolutionSetFirstTest.java | 1 -
.../compiler/FeedbackPropertiesMatchTest.java | 1432 ++++++++++++++++++
.../pact/compiler/IterationsCompilerTest.java | 3 -
.../testfunctions/DummyJoinFunction.java | 28 +
.../example/java/graph/PageRankBasic.java | 8 +-
.../pact/runtime/cache/FileCache.java | 9 +-
.../pact/runtime/shipping/ShipStrategyType.java | 2 +-
.../netty/NettyConnectionManagerTest.java | 2 +-
.../iterations/ConnectedComponentsTest.java | 4 +-
.../iterations/PageRankCompilerTest.java | 108 ++
20 files changed, 1807 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
index b0e4b6e..20610a2 100644
--- a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
+++ b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroOutputFormatTest.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.List;
+@SuppressWarnings("serial")
public class AvroOutputFormatTest extends JavaProgramTestBase {
public static String outputPath1;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
index b60f427..f6720ea 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
@@ -13,12 +13,14 @@
package eu.stratosphere.compiler.dag;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import eu.stratosphere.api.common.operators.base.BulkIterationBase;
+import eu.stratosphere.api.common.operators.util.FieldList;
import eu.stratosphere.compiler.CompilerException;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.PactCompiler.InterestingPropertyVisitor;
@@ -36,6 +38,9 @@ import eu.stratosphere.compiler.plan.BulkPartialSolutionPlanNode;
import eu.stratosphere.compiler.plan.Channel;
import eu.stratosphere.compiler.plan.NamedChannel;
import eu.stratosphere.compiler.plan.PlanNode;
+import eu.stratosphere.compiler.plan.SingleInputPlanNode;
+import eu.stratosphere.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
+import eu.stratosphere.pact.runtime.task.DriverStrategy;
import eu.stratosphere.util.Visitor;
/**
@@ -271,6 +276,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
// 1) Because we enumerate multiple times, we may need to clean the cached plans
// before starting another enumeration
this.nextPartialSolution.accept(PlanCacheCleaner.INSTANCE);
+ if (this.terminationCriterion != null) {
+ this.terminationCriterion.accept(PlanCacheCleaner.INSTANCE);
+ }
// 2) Give the partial solution the properties of the current candidate for the initial partial solution
this.partialSolution.setCandidateProperties(in.getGlobalProperties(), in.getLocalProperties(), in);
@@ -282,14 +290,51 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
// 4) Make sure that the beginning of the step function does not assume properties that
// are not also produced by the end of the step function.
- for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
- PlanNode candidate = planDeleter.next();
+ {
+ List<PlanNode> newCandidates = new ArrayList<PlanNode>();
- // quick-check if the properties at the end of the step function are the same as at the beginning
- if (candidate.getGlobalProperties().equals(pspn.getGlobalProperties()) && candidate.getLocalProperties().equals(pspn.getLocalProperties())) {
- continue;
+ for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
+ PlanNode candidate = planDeleter.next();
+
+ GlobalProperties atEndGlobal = candidate.getGlobalProperties();
+ LocalProperties atEndLocal = candidate.getLocalProperties();
+
+ FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobal, atEndLocal);
+ if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+ ; // depends only through broadcast variable on the partial solution
+ }
+ else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+ // attach a no-op node through which we create the properties of the original input
+ Channel toNoOp = new Channel(candidate);
+ globPropsReq.parameterizeChannel(toNoOp, false, false);
+ locPropsReq.parameterizeChannel(toNoOp);
+
+ UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
+ rebuildPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
+ rebuildPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance());
+
+ SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
+ rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
+ estimator.costOperator(rebuildPropertiesPlanNode);
+
+ GlobalProperties atEndGlobalModified = rebuildPropertiesPlanNode.getGlobalProperties();
+ LocalProperties atEndLocalModified = rebuildPropertiesPlanNode.getLocalProperties();
+
+ if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) {
+ FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobalModified, atEndLocalModified);
+
+ if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+ newCandidates.add(rebuildPropertiesPlanNode);
+ }
+ }
+
+ planDeleter.remove();
+ }
}
- planDeleter.remove();
+ }
+
+ if (candidates.isEmpty()) {
+ return;
}
// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
@@ -302,13 +347,13 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
target.add(node);
}
}
- else if(candidates.size() > 0) {
+ else if (candidates.size() > 0) {
List<PlanNode> terminationCriterionCandidates = this.terminationCriterion.getAlternativePlans(estimator);
SingleRootJoiner singleRoot = (SingleRootJoiner) this.singleRoot;
for (PlanNode candidate : candidates) {
- for(PlanNode terminationCandidate : terminationCriterionCandidates) {
+ for (PlanNode terminationCandidate : terminationCriterionCandidates) {
if (singleRoot.areBranchCompatible(candidate, terminationCandidate)) {
BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getPactContract().getName()+")", in, pspn, candidate, terminationCandidate);
GlobalProperties gProps = candidate.getGlobalProperties().clone();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
index 15c7670..fe823d2 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
@@ -210,6 +210,8 @@ public class DataSinkNode extends OptimizerNode {
Channel c = new Channel(p);
gp.parameterizeChannel(c, globalDopChange, localDopChange);
lp.parameterizeChannel(c);
+ c.setRequiredLocalProps(lp);
+ c.setRequiredGlobalProps(gp);
// no need to check whether the created properties meet what we need in case
// of ordering or global ordering, because the only interesting properties we have
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
index fd4b76a..e1727f9 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
@@ -312,6 +312,7 @@ public abstract class SingleInputNode extends OptimizerNode {
// requested properties
for (RequestedGlobalProperties rgps: allValidGlobals) {
if (rgps.isMetBy(c.getGlobalProperties())) {
+ c.setRequiredGlobalProps(rgps);
addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator);
break;
}
@@ -365,6 +366,7 @@ public abstract class SingleInputNode extends OptimizerNode {
for (OperatorDescriptorSingle dps: getPossibleProperties()) {
for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
if (ilps.isMetBy(in.getLocalProperties())) {
+ in.setRequiredLocalProps(ilps);
instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
break outer;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
index cccbeba..9898c81 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
@@ -447,9 +447,13 @@ public abstract class TwoInputNode extends OptimizerNode {
if (gpp.getProperties1().isMetBy(c1.getGlobalProperties()) &&
gpp.getProperties2().isMetBy(c2.getGlobalProperties()) )
{
+ Channel c1Clone = c1.clone();
+ c1Clone.setRequiredGlobalProps(gpp.getProperties1());
+ c2.setRequiredGlobalProps(gpp.getProperties2());
+
// we form a valid combination, so create the local candidates
// for this
- addLocalCandidates(c1, c2, broadcastPlanChannels, igps1, igps2, outputPlans, allLocalPairs, estimator);
+ addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2, outputPlans, allLocalPairs, estimator);
break;
}
}
@@ -495,7 +499,6 @@ public abstract class TwoInputNode extends OptimizerNode {
final Channel in2 = template2.clone();
ilp2.parameterizeChannel(in2);
- allPossibleLoop:
for (OperatorDescriptorDual dps: this.possibleProperties) {
for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) {
if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
@@ -507,12 +510,16 @@ public abstract class TwoInputNode extends OptimizerNode {
if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(),
in1.getLocalProperties(), in2.getLocalProperties()))
{
+ Channel in1Copy = in1.clone();
+ in1Copy.setRequiredLocalProps(lpp.getProperties1());
+ in2.setRequiredLocalProps(lpp.getProperties2());
+
// all right, co compatible
- instantiate(dps, in1, in2, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
- break allPossibleLoop;
+ instantiate(dps, in1Copy, in2, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
+ break;
} else {
// meet, but not co-compatible
- throw new CompilerException("Implements to adjust one side to the other!");
+// throw new CompilerException("Implements to adjust one side to the other!");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
index 76c4402..2c70794 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
@@ -41,6 +41,7 @@ import eu.stratosphere.compiler.plan.SingleInputPlanNode;
import eu.stratosphere.compiler.plan.SolutionSetPlanNode;
import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
import eu.stratosphere.compiler.plan.WorksetPlanNode;
+import eu.stratosphere.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
import eu.stratosphere.compiler.util.NoOpBinaryUdfOp;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
@@ -279,7 +280,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
@Override
protected void instantiate(OperatorDescriptorDual operator, Channel solutionSetIn, Channel worksetIn,
List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
- RequestedGlobalProperties globPropsReqSolutionSet,RequestedGlobalProperties globPropsReqWorkset,
+ RequestedGlobalProperties globPropsReqSolutionSet, RequestedGlobalProperties globPropsReqWorkset,
RequestedLocalProperties locPropsReqSolutionSet, RequestedLocalProperties locPropsReqWorkset)
{
// check for pipeline breaking using hash join with build on the solution set side
@@ -314,12 +315,54 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
// initial partial solution
// Make sure that the workset candidates fulfill the input requirements
- for (Iterator<PlanNode> planDeleter = worksetCandidates.iterator(); planDeleter.hasNext(); ) {
- PlanNode candidate = planDeleter.next();
- if (!(globPropsReqWorkset.isMetBy(candidate.getGlobalProperties()) && locPropsReqWorkset.isMetBy(candidate.getLocalProperties()))) {
- planDeleter.remove();
+ {
+ List<PlanNode> newCandidates = new ArrayList<PlanNode>();
+
+ for (Iterator<PlanNode> planDeleter = worksetCandidates.iterator(); planDeleter.hasNext(); ) {
+ PlanNode candidate = planDeleter.next();
+
+ GlobalProperties atEndGlobal = candidate.getGlobalProperties();
+ LocalProperties atEndLocal = candidate.getLocalProperties();
+
+ FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(wspn, atEndGlobal, atEndLocal);
+ if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+ ; // depends only through broadcast variable on the workset solution
+ }
+ else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+ // attach a no-op node through which we create the properties of the original input
+ Channel toNoOp = new Channel(candidate);
+ globPropsReqWorkset.parameterizeChannel(toNoOp, false, false);
+ locPropsReqWorkset.parameterizeChannel(toNoOp);
+
+ UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties", FieldList.EMPTY_LIST);
+
+ rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
+ rebuildWorksetPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance());
+
+ SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(rebuildWorksetPropertiesNode, "Rebuild Workset Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
+ rebuildWorksetPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
+ estimator.costOperator(rebuildWorksetPropertiesPlanNode);
+
+ GlobalProperties atEndGlobalModified = rebuildWorksetPropertiesPlanNode.getGlobalProperties();
+ LocalProperties atEndLocalModified = rebuildWorksetPropertiesPlanNode.getLocalProperties();
+
+ if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) {
+ FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet(wspn, atEndGlobalModified, atEndLocalModified);
+
+ if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+ newCandidates.add(rebuildWorksetPropertiesPlanNode);
+ }
+ }
+
+ // remove the original operator and add the modified candidate
+ planDeleter.remove();
+
+ }
}
+
+ worksetCandidates.addAll(newCandidates);
}
+
if (worksetCandidates.isEmpty()) {
return;
}
@@ -342,7 +385,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
gp.setHashPartitioned(this.solutionSetKeyFields);
gp.addUniqueFieldCombination(this.solutionSetKeyFields);
- LocalProperties lp = LocalProperties.TRIVIAL.addUniqueFields(this.solutionSetKeyFields);
+ LocalProperties lp = LocalProperties.EMPTY.addUniqueFields(this.solutionSetKeyFields);
// take all combinations of solution set delta and workset plans
for (PlanNode solutionSetCandidate : solutionSetDeltaCandidates) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
index b45e341..f49c130 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
@@ -27,7 +27,7 @@ import eu.stratosphere.compiler.dag.OptimizerNode;
*/
public class LocalProperties implements Cloneable {
- public static final LocalProperties TRIVIAL = new LocalProperties();
+ public static final LocalProperties EMPTY = new LocalProperties();
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
index 3935f5e..574922a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java
@@ -229,8 +229,6 @@ public final class RequestedGlobalProperties implements Cloneable {
final GlobalProperties inGlobals = channel.getSource().getGlobalProperties();
// if we have no global parallelism change, check if we have already compatible global properties
if (!globalDopChange && !localDopChange && isMetBy(inGlobals)) {
- channel.setRequiredGlobalProps(this);
-
// we meet already everything, so go forward
channel.setShipStrategy(ShipStrategyType.FORWARD);
return;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
index e714cea..aeae0d2 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedLocalProperties.java
@@ -33,9 +33,9 @@ public class RequestedLocalProperties implements Cloneable {
// --------------------------------------------------------------------------------------------
- Ordering ordering; // order inside a partition, null if not ordered
+ private Ordering ordering; // order inside a partition, null if not ordered
- FieldSet groupedFields; // fields by which the stream is grouped. null if not grouped.
+ private FieldSet groupedFields; // fields by which the stream is grouped. null if not grouped.
// --------------------------------------------------------------------------------------------
@@ -190,8 +190,8 @@ public class RequestedLocalProperties implements Cloneable {
LocalProperties current = channel.getLocalProperties();
if (isMetBy(current)) {
- // we are met. record that this is needed.
- channel.setRequiredLocalProps(this);
+ // we are met, all is good
+ channel.setLocalStrategy(LocalStrategy.NONE);
}
else if (this.ordering != null) {
channel.setLocalStrategy(LocalStrategy.SORT, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
index 16241e4..69263bc 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
@@ -29,7 +29,9 @@ import eu.stratosphere.compiler.dataproperties.GlobalProperties;
import eu.stratosphere.compiler.dataproperties.LocalProperties;
import eu.stratosphere.compiler.plandump.DumpableConnection;
import eu.stratosphere.compiler.plandump.DumpableNode;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
+import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.util.Visitable;
/**
@@ -434,8 +436,85 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
// --------------------------------------------------------------------------------------------
+ /**
+ * Checks whether this node has a dam on the way down to the given source node. This method
+ * returns either that (a) the source node is not found as a (transitive) child of this node,
+ * (b) the node is found, but no dam is on the path, or (c) the node is found and a dam is on
+ * the path.
+ *
+ * @param source The node on the path to which the dam is sought.
+ * @return The result whether the node is found and whether a dam is on the path.
+ */
public abstract SourceAndDamReport hasDamOnPathDownTo(PlanNode source);
+ public FeedbackPropertiesMeetRequirementsReport checkPartialSolutionPropertiesMet(PlanNode partialSolution, GlobalProperties feedbackGlobal, LocalProperties feedbackLocal) {
+ if (this == partialSolution) {
+ return FeedbackPropertiesMeetRequirementsReport.PENDING;
+ }
+
+ boolean found = false;
+ boolean allMet = true;
+ boolean allLocallyMet = true;
+
+ for (Channel input : getInputs()) {
+ FeedbackPropertiesMeetRequirementsReport inputState = input.getSource().checkPartialSolutionPropertiesMet(partialSolution, feedbackGlobal, feedbackLocal);
+
+ if (inputState == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+ continue;
+ }
+ else if (inputState == FeedbackPropertiesMeetRequirementsReport.MET) {
+ found = true;
+ continue;
+ }
+ else if (inputState == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+ return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+ }
+ else {
+ found = true;
+
+ // the partial solution was on the path here. check whether the channel requires
+ // certain properties that are met, or whether the channel introduces new properties
+
+ // if the plan introduces new global properties, then we can stop looking whether
+ // the feedback properties are sufficient to meet the requirements
+ if (input.getShipStrategy() != ShipStrategyType.FORWARD && input.getShipStrategy() != ShipStrategyType.NONE) {
+ continue;
+ }
+
+ // first check whether this channel requires something that is not met
+ if (input.getRequiredGlobalProps() != null && !input.getRequiredGlobalProps().isMetBy(feedbackGlobal)) {
+ return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+ }
+
+ // in general, not everything is met here already
+ allMet = false;
+
+ // if the plan introduces new local properties, we can stop checking for matching local properties
+ if (inputState != FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET) {
+
+ if (input.getLocalStrategy() == LocalStrategy.NONE) {
+
+ if (input.getRequiredLocalProps() != null && !input.getRequiredLocalProps().isMetBy(feedbackLocal)) {
+ return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+ }
+
+ allLocallyMet = false;
+ }
+ }
+ }
+ }
+
+ if (!found) {
+ return FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION;
+ } else if (allMet) {
+ return FeedbackPropertiesMeetRequirementsReport.MET;
+ } else if (allLocallyMet) {
+ return FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET;
+ } else {
+ return FeedbackPropertiesMeetRequirementsReport.PENDING;
+ }
+ }
+
// --------------------------------------------------------------------------------------------
@@ -447,19 +526,16 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
// --------------------------------------------------------------------------------------------
-
@Override
public OptimizerNode getOptimizerNode() {
return this.template;
}
-
@Override
public PlanNode getPlanNode() {
return this;
}
-
@Override
public Iterable<DumpableConnection<PlanNode>> getDumpableInputs() {
List<DumpableConnection<PlanNode>> allInputs = new ArrayList<DumpableConnection<PlanNode>>();
@@ -480,4 +556,25 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
public static enum SourceAndDamReport {
NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM;
}
+
+
+
+ public static enum FeedbackPropertiesMeetRequirementsReport {
+ /** Indicates that the path is irrelevant */
+ NO_PARTIAL_SOLUTION,
+
+ /** Indicates that the question whether the properties are met has been determined pending
+ * dependent on global and local properties */
+ PENDING,
+
+ /** Indicates that the question whether the properties are met has been determined pending
+ * dependent on global properties only */
+ PENDING_LOCAL_MET,
+
+ /** Indicates that the question whether the properties are met has been determined true */
+ MET,
+
+ /** Indicates that the question whether the properties are met has been determined false */
+ NOT_MET;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ca2b287a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
index c2b377e..1c99558 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
@@ -76,7 +76,6 @@ public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
oPlan.accept(new Visitor<PlanNode>() {
@Override
public boolean preVisit(PlanNode visitable) {
- System.out.println(visitable);
if (visitable instanceof WorksetIterationPlanNode) {
PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();
[4/6] git commit: Minor fix for displayed estimates.
Posted by se...@apache.org.
Minor fix for displayed estimates.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/229754d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/229754d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/229754d2
Branch: refs/heads/master
Commit: 229754d20076fd24f97c3c0b87cbf18828de8e08
Parents: 708426b
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jun 13 16:54:42 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200
----------------------------------------------------------------------
.../main/java/eu/stratosphere/compiler/dag/OptimizerNode.java | 7 +++++++
1 file changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/229754d2/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
index f2a9053..70471c1 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
@@ -630,6 +630,13 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
// let every operator do its computation
computeOperatorSpecificDefaultEstimates(statistics);
+ if (this.estimatedOutputSize < 0) {
+ this.estimatedOutputSize = -1;
+ }
+ if (this.estimatedNumRecords < 0) {
+ this.estimatedNumRecords = -1;
+ }
+
// overwrite default estimates with hints, if given
if (getPactContract() == null || getPactContract().getCompilerHints() == null) {
return ;
[5/6] git commit: Made LocalProperties Immutable (avoid side effect
mutation bugs) Change several Iterators to Iterables in the compiler (simpler
code)
Posted by se...@apache.org.
Made LocalProperties Immutable (avoid side effect mutation bugs)
Change several Iterators to Iterables in the compiler (simpler code)
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ed7c3f15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ed7c3f15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ed7c3f15
Branch: refs/heads/master
Commit: ed7c3f156ec632af6f5764fad8650a771b69c1d7
Parents: 202aa4a
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jun 16 09:13:59 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jun 17 02:07:37 2014 +0200
----------------------------------------------------------------------
.../eu/stratosphere/compiler/PactCompiler.java | 9 +-
.../compiler/costs/CostEstimator.java | 5 +-
.../compiler/dag/OptimizerNode.java | 8 +-
.../compiler/dag/WorksetIterationNode.java | 3 +-
.../dataproperties/LocalProperties.java | 133 +++++++++----------
.../operators/AllGroupReduceProperties.java | 3 +-
.../AllGroupWithPartialPreGroupProperties.java | 3 +-
.../compiler/operators/CoGroupDescriptor.java | 3 +-
.../operators/CollectorMapDescriptor.java | 3 +-
.../CrossStreamOuterFirstDescriptor.java | 6 +-
.../CrossStreamOuterSecondDescriptor.java | 6 +-
.../compiler/operators/FlatMapDescriptor.java | 3 +-
.../operators/GroupReduceProperties.java | 3 +-
.../GroupReduceWithCombineProperties.java | 3 +-
.../operators/PartialGroupProperties.java | 3 +-
.../compiler/operators/ReduceProperties.java | 3 +-
.../operators/SortMergeJoinDescriptor.java | 3 +-
.../plan/BulkPartialSolutionPlanNode.java | 9 +-
.../eu/stratosphere/compiler/plan/Channel.java | 3 +-
.../compiler/plan/DualInputPlanNode.java | 58 +-------
.../compiler/plan/NAryUnionPlanNode.java | 14 +-
.../eu/stratosphere/compiler/plan/PlanNode.java | 22 +--
.../compiler/plan/SingleInputPlanNode.java | 53 +-------
.../compiler/plan/SolutionSetPlanNode.java | 9 +-
.../compiler/plan/SourcePlanNode.java | 9 +-
.../compiler/plan/WorksetPlanNode.java | 9 +-
.../compiler/plandump/DumpableNode.java | 6 +-
.../plandump/PlanJSONDumpGenerator.java | 9 +-
.../plantranslate/NepheleJobGraphGenerator.java | 10 +-
.../postpass/GenericFlatTypePostPass.java | 5 +-
.../compiler/postpass/JavaApiPostPass.java | 5 +-
.../compiler/CoGroupSolutionSetFirstTest.java | 2 +-
.../compiler/UnionPropertyPropagationTest.java | 5 +-
.../eu/stratosphere/util/IterableIterator.java | 26 ++++
34 files changed, 181 insertions(+), 273 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
index 8399fed..96eb01d 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
@@ -1284,8 +1284,7 @@ public class PactCompiler {
}
// assign memory to the local and global strategies of the channels
- for (Iterator<Channel> channels = node.getInputs(); channels.hasNext();) {
- final Channel c = channels.next();
+ for (Channel c : node.getInputs()) {
if (c.getLocalStrategy().dams()) {
final long mem = memoryPerInstanceAndWeight / node.getSubtasksPerInstance();
c.setMemoryLocalStrategy(mem);
@@ -1362,8 +1361,7 @@ public class PactCompiler {
// double-connect the connections. previously, only parents knew their children, because
// one child candidate could have been referenced by multiple parents.
- for (Iterator<Channel> iter = visitable.getInputs(); iter.hasNext();) {
- final Channel conn = iter.next();
+ for (Channel conn : visitable.getInputs()) {
conn.setTarget(visitable);
conn.getSource().addOutgoingChannel(conn);
}
@@ -1375,8 +1373,7 @@ public class PactCompiler {
// count the memory consumption
this.memoryConsumerWeights += visitable.getMemoryConsumerWeight();
- for (Iterator<Channel> channels = visitable.getInputs(); channels.hasNext();) {
- final Channel c = channels.next();
+ for (Channel c : visitable.getInputs()) {
if (c.getLocalStrategy().dams()) {
this.memoryConsumerWeights++;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
index 11fb45b..3e1d94a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
@@ -71,8 +71,7 @@ public abstract class CostEstimator {
final long availableMemory = n.getGuaranteedAvailableMemory();
// add the shipping strategy costs
- for (Iterator<Channel> channels = n.getInputs(); channels.hasNext(); ) {
- final Channel channel = channels.next();
+ for (Channel channel : n.getInputs()) {
final Costs costs = new Costs();
// Plans that apply the same strategies, but at different points
@@ -139,7 +138,7 @@ public abstract class CostEstimator {
// get the inputs, if we have some
{
- Iterator<Channel> channels = n.getInputs();
+ Iterator<Channel> channels = n.getInputs().iterator();
if (channels.hasNext()) {
firstInput = channels.next();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
index 70471c1..b2c9330 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
@@ -285,7 +285,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
// ------------------------------------------------------------------------
@Override
- public Iterator<OptimizerNode> getPredecessors() {
+ public Iterable<OptimizerNode> getPredecessors() {
List<OptimizerNode> allPredecessors = new ArrayList<OptimizerNode>();
for (Iterator<PactConnection> inputs = getIncomingConnections().iterator(); inputs.hasNext(); ){
@@ -296,7 +296,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
allPredecessors.add(conn.getSource());
}
- return allPredecessors.iterator();
+ return allPredecessors;
}
/**
@@ -1208,13 +1208,13 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
}
@Override
- public Iterator<DumpableConnection<OptimizerNode>> getDumpableInputs() {
+ public Iterable<DumpableConnection<OptimizerNode>> getDumpableInputs() {
List<DumpableConnection<OptimizerNode>> allInputs = new ArrayList<DumpableConnection<OptimizerNode>>();
allInputs.addAll(getIncomingConnections());
allInputs.addAll(getBroadcastConnections());
- return allInputs.iterator();
+ return allInputs;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
index 94580ad..76c4402 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java
@@ -342,8 +342,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
gp.setHashPartitioned(this.solutionSetKeyFields);
gp.addUniqueFieldCombination(this.solutionSetKeyFields);
- final LocalProperties lp = new LocalProperties();
- lp.addUniqueFields(this.solutionSetKeyFields);
+ LocalProperties lp = LocalProperties.TRIVIAL.addUniqueFields(this.solutionSetKeyFields);
// take all combinations of solution set delta and workset plans
for (PlanNode solutionSetCandidate : solutionSetDeltaCandidates) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
index d4cbeca..b45e341 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/LocalProperties.java
@@ -25,8 +25,12 @@ import eu.stratosphere.compiler.dag.OptimizerNode;
* This class represents local properties of the data. A local property is a property that exists
* within the data of a single partition.
*/
-public class LocalProperties implements Cloneable
-{
+public class LocalProperties implements Cloneable {
+
+ public static final LocalProperties TRIVIAL = new LocalProperties();
+
+ // --------------------------------------------------------------------------------------------
+
private Ordering ordering; // order inside a partition, null if not ordered
private FieldList groupedFields; // fields by which the stream is grouped. null if not grouped.
@@ -39,23 +43,6 @@ public class LocalProperties implements Cloneable
* Default constructor for trivial local properties. No order, no grouping, no uniqueness.
*/
public LocalProperties() {}
-
- /**
- * Creates a new instance of local properties that have the given ordering as property,
- * field grouping and field uniqueness. Any of the given parameters may be null. Beware, though,
- * that a null grouping is inconsistent with a non-null ordering.
- * <p>
- * This constructor is used only for internal copy creation.
- *
- * @param ordering The ordering represented by these local properties.
- * @param groupedFields The grouped fields for these local properties.
- * @param uniqueFields The unique fields for these local properties.
- */
- private LocalProperties(Ordering ordering, FieldList groupedFields, Set<FieldSet> uniqueFields) {
- this.ordering = ordering;
- this.groupedFields = groupedFields;
- this.uniqueFields = uniqueFields;
- }
// --------------------------------------------------------------------------------------------
@@ -69,17 +56,6 @@ public class LocalProperties implements Cloneable
}
/**
- * Sets the key order for these global properties.
- *
- * @param keyOrder
- * The key order to set.
- */
- public void setOrdering(Ordering ordering) {
- this.ordering = ordering;
- this.groupedFields = ordering.getInvolvedIndexes();
- }
-
- /**
* Gets the grouped fields.
*
* @return The grouped fields, or <code>null</code> if nothing is grouped.
@@ -87,15 +63,6 @@ public class LocalProperties implements Cloneable
public FieldList getGroupedFields() {
return this.groupedFields;
}
-
- /**
- * Sets the fields that are grouped in these data properties.
- *
- * @param groupedFields The fields that are grouped in these data properties.
- */
- public void setGroupedFields(FieldList groupedFields) {
- this.groupedFields = groupedFields;
- }
/**
* Gets the fields whose combination is unique within the data set.
@@ -106,6 +73,12 @@ public class LocalProperties implements Cloneable
return this.uniqueFields;
}
+ /**
+ * Checks whether the given set of fields is unique, as specified in these local properties.
+ *
+ * @param set The set to check.
+ * @return True, if the given column combination is unique, false if not.
+ */
public boolean areFieldsUnique(FieldSet set) {
return this.uniqueFields != null && this.uniqueFields.contains(set);
}
@@ -115,42 +88,33 @@ public class LocalProperties implements Cloneable
*
* @param uniqueFields The fields that are unique in these data properties.
*/
- public void addUniqueFields(FieldSet uniqueFields) {
- if (this.uniqueFields == null) {
- this.uniqueFields = new HashSet<FieldSet>();
+ public LocalProperties addUniqueFields(FieldSet uniqueFields) {
+ LocalProperties copy = clone();
+
+ if (copy.uniqueFields == null) {
+ copy.uniqueFields = new HashSet<FieldSet>();
}
- this.uniqueFields.add(uniqueFields);
+ copy.uniqueFields.add(uniqueFields);
+ return copy;
}
- public void clearUniqueFieldSets() {
- if (this.uniqueFields != null) {
- this.uniqueFields = null;
+ public LocalProperties clearUniqueFieldSets() {
+ if (this.uniqueFields == null || this.uniqueFields.isEmpty()) {
+ return this;
+ } else {
+ LocalProperties copy = new LocalProperties();
+ copy.ordering = this.ordering;
+ copy.groupedFields = this.groupedFields;
+ return copy;
}
}
- public boolean areFieldsGrouped(FieldSet set) {
- return this.groupedFields != null && this.groupedFields.isValidUnorderedPrefix(set);
- }
-
- public boolean meetsOrderingConstraint(Ordering o) {
- return o.isMetBy(this.ordering);
- }
-
/**
* Checks, if the properties in this object are trivial, i.e. only standard values.
*/
public boolean isTrivial() {
return ordering == null && this.groupedFields == null && this.uniqueFields == null;
}
-
- /**
- * This method resets the local properties to a state where no properties are given.
- */
- public void reset() {
- this.ordering = null;
- this.groupedFields = null;
- this.uniqueFields = null;
- }
// --------------------------------------------------------------------------------------------
@@ -162,8 +126,7 @@ public class LocalProperties implements Cloneable
*
* @return True, if the resulting properties are non trivial.
*/
- public LocalProperties filterByNodesConstantSet(OptimizerNode node, int input)
- {
+ public LocalProperties filterByNodesConstantSet(OptimizerNode node, int input) {
// check, whether the local order is preserved
Ordering no = this.ordering;
FieldList ngf = this.groupedFields;
@@ -183,7 +146,8 @@ public class LocalProperties implements Cloneable
break;
}
}
- } else if (this.groupedFields != null) {
+ }
+ else if (this.groupedFields != null) {
// check, whether the local key grouping is preserved
for (Integer index : this.groupedFields) {
if (!node.isFieldConstant(input, index)) {
@@ -192,8 +156,7 @@ public class LocalProperties implements Cloneable
}
}
- // check, whether the local key grouping is preserved
- if (this.uniqueFields != null) {
+ if (this.uniqueFields != null && this.uniqueFields.size() > 0) {
Set<FieldSet> s = new HashSet<FieldSet>(this.uniqueFields);
for (FieldSet fields : this.uniqueFields) {
for (Integer index : fields) {
@@ -208,9 +171,15 @@ public class LocalProperties implements Cloneable
}
}
- return (no == this.ordering && ngf == this.groupedFields && nuf == this.uniqueFields) ? this :
- (no == null && ngf == null && nuf == null) ? new LocalProperties() :
- new LocalProperties(no, ngf, nuf);
+ if (no == this.ordering && ngf == this.groupedFields && nuf == this.uniqueFields) {
+ return this;
+ } else {
+ LocalProperties lp = new LocalProperties();
+ lp.ordering = no;
+ lp.groupedFields = ngf;
+ lp.uniqueFields = nuf;
+ return lp;
+ }
}
// --------------------------------------------------------------------------------------------
@@ -245,8 +214,11 @@ public class LocalProperties implements Cloneable
@Override
public LocalProperties clone() {
- return new LocalProperties(this.ordering, this.groupedFields,
- this.uniqueFields == null ? null : new HashSet<FieldSet>(this.uniqueFields));
+ LocalProperties copy = new LocalProperties();
+ copy.ordering = this.ordering;
+ copy.groupedFields = this.groupedFields;
+ copy.uniqueFields = (this.uniqueFields == null ? null : new HashSet<FieldSet>(this.uniqueFields));
+ return copy;
}
// --------------------------------------------------------------------------------------------
@@ -268,4 +240,19 @@ public class LocalProperties implements Cloneable
return lp1;
}
}
+
+ // --------------------------------------------------------------------------------------------
+
+ public static LocalProperties forOrdering(Ordering o) {
+ LocalProperties props = new LocalProperties();
+ props.ordering = o;
+ props.groupedFields = o.getInvolvedIndexes();
+ return props;
+ }
+
+ public static LocalProperties forGrouping(FieldList groupedFields) {
+ LocalProperties props = new LocalProperties();
+ props.groupedFields = groupedFields;
+ return props;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupReduceProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupReduceProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupReduceProperties.java
index ee03ce0..2cc7fe2 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupReduceProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupReduceProperties.java
@@ -62,7 +62,6 @@ public final class AllGroupReduceProperties extends OperatorDescriptorSingle {
@Override
public LocalProperties computeLocalProperties(LocalProperties lProps) {
- lProps.clearUniqueFieldSets();
- return lProps;
+ return lProps.clearUniqueFieldSets();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java
index d387c69..b389855 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java
@@ -84,7 +84,6 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
@Override
public LocalProperties computeLocalProperties(LocalProperties lProps) {
- lProps.clearUniqueFieldSets();
- return lProps;
+ return lProps.clearUniqueFieldSets();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
index edc6c69..e288600 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
@@ -156,7 +156,6 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
LocalProperties comb = LocalProperties.combine(in1, in2);
- comb.clearUniqueFieldSets();
- return comb;
+ return comb.clearUniqueFieldSets();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CollectorMapDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CollectorMapDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CollectorMapDescriptor.java
index ef574c6..ebdcb97 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CollectorMapDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CollectorMapDescriptor.java
@@ -62,7 +62,6 @@ public class CollectorMapDescriptor extends OperatorDescriptorSingle {
@Override
public LocalProperties computeLocalProperties(LocalProperties lProps) {
- lProps.clearUniqueFieldSets();
- return lProps;
+ return lProps.clearUniqueFieldSets();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterFirstDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterFirstDescriptor.java
index fa2559e..3a6ad15 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterFirstDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterFirstDescriptor.java
@@ -40,9 +40,9 @@ public class CrossStreamOuterFirstDescriptor extends CartesianProductDescriptor
if ((in1.getGroupedFields() == null || in1.getGroupedFields().size() == 0) &&
in1.getUniqueFields() != null && in1.getUniqueFields().size() > 0)
{
- in1.setGroupedFields(in1.getUniqueFields().iterator().next().toFieldList());
+ return LocalProperties.forGrouping(in1.getUniqueFields().iterator().next().toFieldList());
+ } else {
+ return in1.clearUniqueFieldSets();
}
- in1.clearUniqueFieldSets();
- return in1;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterSecondDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterSecondDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterSecondDescriptor.java
index 4a2c9e2..7a81c72 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterSecondDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CrossStreamOuterSecondDescriptor.java
@@ -40,9 +40,9 @@ public class CrossStreamOuterSecondDescriptor extends CartesianProductDescriptor
if ((in2.getGroupedFields() == null || in2.getGroupedFields().size() == 0) &&
in2.getUniqueFields() != null && in2.getUniqueFields().size() > 0)
{
- in2.setGroupedFields(in2.getUniqueFields().iterator().next().toFieldList());
+ return LocalProperties.forGrouping(in2.getUniqueFields().iterator().next().toFieldList());
+ } else {
+ return in2.clearUniqueFieldSets();
}
- in2.clearUniqueFieldSets();
- return in2;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/FlatMapDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/FlatMapDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/FlatMapDescriptor.java
index 807fe88..c81be04 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/FlatMapDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/FlatMapDescriptor.java
@@ -62,7 +62,6 @@ public class FlatMapDescriptor extends OperatorDescriptorSingle {
@Override
public LocalProperties computeLocalProperties(LocalProperties lProps) {
- lProps.clearUniqueFieldSets();
- return lProps;
+ return lProps.clearUniqueFieldSets();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceProperties.java
index 72f05ee..5cddf62 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceProperties.java
@@ -99,7 +99,6 @@ public final class GroupReduceProperties extends OperatorDescriptorSingle {
@Override
public LocalProperties computeLocalProperties(LocalProperties lProps) {
- lProps.clearUniqueFieldSets();
- return lProps;
+ return lProps.clearUniqueFieldSets();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java
index 496ab01..980cf6d 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java
@@ -129,7 +129,6 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
@Override
public LocalProperties computeLocalProperties(LocalProperties lProps) {
- lProps.clearUniqueFieldSets();
- return lProps;
+ return lProps.clearUniqueFieldSets();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java
index f4f090d..a28feeb 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java
@@ -74,7 +74,6 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle {
@Override
public LocalProperties computeLocalProperties(LocalProperties lProps) {
- lProps.clearUniqueFieldSets();
- return lProps;
+ return lProps.clearUniqueFieldSets();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java
index ec4d95c..4539da5 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java
@@ -96,7 +96,6 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
@Override
public LocalProperties computeLocalProperties(LocalProperties lProps) {
- lProps.clearUniqueFieldSets();
- return lProps;
+ return lProps.clearUniqueFieldSets();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/SortMergeJoinDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/SortMergeJoinDescriptor.java
index 17a8bd0..f5d83f9 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/SortMergeJoinDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/SortMergeJoinDescriptor.java
@@ -89,7 +89,6 @@ public class SortMergeJoinDescriptor extends AbstractJoinDescriptor
@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
LocalProperties comb = LocalProperties.combine(in1, in2);
- comb.clearUniqueFieldSets();
- return comb;
+ return comb.clearUniqueFieldSets();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/BulkPartialSolutionPlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/BulkPartialSolutionPlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/BulkPartialSolutionPlanNode.java
index ae3dc63..2281a5c 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/BulkPartialSolutionPlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/BulkPartialSolutionPlanNode.java
@@ -19,7 +19,6 @@ import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOUN
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import eu.stratosphere.compiler.costs.Costs;
import eu.stratosphere.compiler.dag.BulkPartialSolutionNode;
@@ -91,13 +90,13 @@ public class BulkPartialSolutionPlanNode extends PlanNode {
}
@Override
- public Iterator<PlanNode> getPredecessors() {
- return Collections.<PlanNode>emptyList().iterator();
+ public Iterable<PlanNode> getPredecessors() {
+ return Collections.<PlanNode>emptyList();
}
@Override
- public Iterator<Channel> getInputs() {
- return Collections.<Channel>emptyList().iterator();
+ public Iterable<Channel> getInputs() {
+ return Collections.<Channel>emptyList();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
index 1cd6a36..8fe95c9 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
@@ -386,8 +386,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
break;
case SORT:
case COMBININGSORT:
- this.localProps = new LocalProperties();
- this.localProps.setOrdering(Utils.createOrdering(this.localKeys, this.localSortOrder));
+ this.localProps = LocalProperties.forOrdering(Utils.createOrdering(this.localKeys, this.localSortOrder));
break;
default:
throw new CompilerException("Unsupported local strategy for channel.");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/DualInputPlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/DualInputPlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/DualInputPlanNode.java
index 9fe9e2a..8ad32a8 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/DualInputPlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/DualInputPlanNode.java
@@ -18,9 +18,8 @@ import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.FOUND_SO
import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Arrays;
import java.util.List;
-import java.util.NoSuchElementException;
import eu.stratosphere.api.common.operators.util.FieldList;
import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
@@ -168,31 +167,9 @@ public class DualInputPlanNode extends PlanNode {
@Override
- public Iterator<PlanNode> getPredecessors() {
+ public Iterable<PlanNode> getPredecessors() {
if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) {
- return new Iterator<PlanNode>() {
- private int hasLeft = 2;
- @Override
- public boolean hasNext() {
- return this.hasLeft > 0;
- }
- @Override
- public PlanNode next() {
- if (this.hasLeft == 2) {
- this.hasLeft = 1;
- return DualInputPlanNode.this.input1.getSource();
- } else if (this.hasLeft == 1) {
- this.hasLeft = 0;
- return DualInputPlanNode.this.input2.getSource();
- } else {
- throw new NoSuchElementException();
- }
- }
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ return Arrays.asList(this.input1.getSource(), this.input2.getSource());
} else {
List<PlanNode> preds = new ArrayList<PlanNode>();
@@ -203,36 +180,13 @@ public class DualInputPlanNode extends PlanNode {
preds.add(c.getSource());
}
- return preds.iterator();
+ return preds;
}
}
-
@Override
- public Iterator<Channel> getInputs() {
- return new Iterator<Channel>() {
- private int hasLeft = 2;
- @Override
- public boolean hasNext() {
- return this.hasLeft > 0;
- }
- @Override
- public Channel next() {
- if (this.hasLeft == 2) {
- this.hasLeft = 1;
- return DualInputPlanNode.this.input1;
- } else if (this.hasLeft == 1) {
- this.hasLeft = 0;
- return DualInputPlanNode.this.input2;
- } else {
- throw new NoSuchElementException();
- }
- }
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ public Iterable<Channel> getInputs() {
+ return Arrays.asList(this.input1, this.input2);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/NAryUnionPlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/NAryUnionPlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/NAryUnionPlanNode.java
index b011b8f..8009cbe 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/NAryUnionPlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/NAryUnionPlanNode.java
@@ -21,6 +21,7 @@ import eu.stratosphere.compiler.dag.BinaryUnionNode;
import eu.stratosphere.compiler.dataproperties.GlobalProperties;
import eu.stratosphere.compiler.dataproperties.LocalProperties;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
+import eu.stratosphere.util.IterableIterator;
import eu.stratosphere.util.Visitor;
/**
@@ -55,14 +56,14 @@ public class NAryUnionPlanNode extends PlanNode {
}
@Override
- public Iterator<Channel> getInputs() {
- return Collections.unmodifiableList(this.inputs).iterator();
+ public Iterable<Channel> getInputs() {
+ return Collections.unmodifiableList(this.inputs);
}
@Override
- public Iterator<PlanNode> getPredecessors() {
+ public Iterable<PlanNode> getPredecessors() {
final Iterator<Channel> channels = this.inputs.iterator();
- return new Iterator<PlanNode>() {
+ return new IterableIterator<PlanNode>() {
@Override
public boolean hasNext() {
@@ -78,6 +79,11 @@ public class NAryUnionPlanNode extends PlanNode {
public void remove() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public Iterator<PlanNode> iterator() {
+ return this;
+ }
};
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
index 4f2c049..16241e4 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java
@@ -15,7 +15,6 @@ package eu.stratosphere.compiler.plan;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -275,8 +274,9 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
this.cumulativeCosts = nodeCosts.clone();
// add all the normal inputs
- for (Iterator<PlanNode> preds = getPredecessors(); preds.hasNext();) {
- Costs parentCosts = preds.next().getCumulativeCostsShare();
+ for (PlanNode pred : getPredecessors()) {
+
+ Costs parentCosts = pred.getCumulativeCostsShare();
if (parentCosts != null) {
this.cumulativeCosts.addCosts(parentCosts);
} else {
@@ -325,10 +325,10 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
// Input, Predecessors, Successors
// --------------------------------------------------------------------------------------------
- public abstract Iterator<Channel> getInputs();
+ public abstract Iterable<Channel> getInputs();
@Override
- public abstract Iterator<PlanNode> getPredecessors();
+ public abstract Iterable<PlanNode> getPredecessors();
/**
* Sets a list of all broadcast inputs attached to this node.
@@ -396,7 +396,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
}
for (FieldSet fields : uniqueFieldCombinations) {
this.globalProps.addUniqueFieldCombination(fields);
- this.localProps.addUniqueFields(fields);
+ this.localProps = this.localProps.addUniqueFields(fields);
}
}
@@ -461,20 +461,22 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
@Override
- public Iterator<DumpableConnection<PlanNode>> getDumpableInputs() {
+ public Iterable<DumpableConnection<PlanNode>> getDumpableInputs() {
List<DumpableConnection<PlanNode>> allInputs = new ArrayList<DumpableConnection<PlanNode>>();
- for (Iterator<Channel> inputs = getInputs(); inputs.hasNext();) {
- allInputs.add(inputs.next());
+ for (Channel c : getInputs()) {
+ allInputs.add(c);
}
for (NamedChannel c : getBroadcastInputs()) {
allInputs.add(c);
}
- return allInputs.iterator();
+ return allInputs;
}
+ // --------------------------------------------------------------------------------------------
+
public static enum SourceAndDamReport {
NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SingleInputPlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SingleInputPlanNode.java
index 7b7b429..29e52ab 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SingleInputPlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SingleInputPlanNode.java
@@ -18,9 +18,8 @@ import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.FOUND_SO
import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Collections;
import java.util.List;
-import java.util.NoSuchElementException;
import eu.stratosphere.api.common.operators.util.FieldList;
import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
@@ -150,68 +149,28 @@ public class SingleInputPlanNode extends PlanNode {
@Override
- public Iterator<PlanNode> getPredecessors() {
+ public Iterable<PlanNode> getPredecessors() {
if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) {
- return new Iterator<PlanNode>() {
- private boolean hasLeft = true;
- @Override
- public boolean hasNext() {
- return this.hasLeft;
- }
- @Override
- public PlanNode next() {
- if (this.hasLeft) {
- this.hasLeft = false;
- return SingleInputPlanNode.this.input.getSource();
- } else {
- throw new NoSuchElementException();
- }
- }
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ return Collections.singleton(this.input.getSource());
}
else {
List<PlanNode> preds = new ArrayList<PlanNode>();
-
preds.add(input.getSource());
for (Channel c : getBroadcastInputs()) {
preds.add(c.getSource());
}
- return preds.iterator();
+ return preds;
}
}
@Override
- public Iterator<Channel> getInputs() {
- return new Iterator<Channel>() {
- private boolean hasLeft = true;
- @Override
- public boolean hasNext() {
- return this.hasLeft;
- }
- @Override
- public Channel next() {
- if (this.hasLeft) {
- this.hasLeft = false;
- return SingleInputPlanNode.this.input;
- } else {
- throw new NoSuchElementException();
- }
- }
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ public Iterable<Channel> getInputs() {
+ return Collections.singleton(this.input);
}
-
@Override
public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
if (source == this) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SolutionSetPlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SolutionSetPlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SolutionSetPlanNode.java
index 4f47093..ee54c5a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SolutionSetPlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SolutionSetPlanNode.java
@@ -19,7 +19,6 @@ import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOUN
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import eu.stratosphere.compiler.costs.Costs;
import eu.stratosphere.compiler.dag.OptimizerNode;
@@ -92,14 +91,14 @@ public class SolutionSetPlanNode extends PlanNode {
@Override
- public Iterator<PlanNode> getPredecessors() {
- return Collections.<PlanNode>emptyList().iterator();
+ public Iterable<PlanNode> getPredecessors() {
+ return Collections.<PlanNode>emptyList();
}
@Override
- public Iterator<Channel> getInputs() {
- return Collections.<Channel>emptyList().iterator();
+ public Iterable<Channel> getInputs() {
+ return Collections.<Channel>emptyList();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SourcePlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SourcePlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SourcePlanNode.java
index 2e7d86a..b0900d9 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SourcePlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/SourcePlanNode.java
@@ -17,7 +17,6 @@ import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.FOUND_SO
import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
import java.util.Collections;
-import java.util.Iterator;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.compiler.dag.DataSourceNode;
@@ -82,14 +81,14 @@ public class SourcePlanNode extends PlanNode {
@Override
- public Iterator<PlanNode> getPredecessors() {
- return Collections.<PlanNode>emptyList().iterator();
+ public Iterable<PlanNode> getPredecessors() {
+ return Collections.<PlanNode>emptyList();
}
@Override
- public Iterator<Channel> getInputs() {
- return Collections.<Channel>emptyList().iterator();
+ public Iterable<Channel> getInputs() {
+ return Collections.<Channel>emptyList();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/WorksetPlanNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/WorksetPlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/WorksetPlanNode.java
index 851e269..936eb67 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/WorksetPlanNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/WorksetPlanNode.java
@@ -19,7 +19,6 @@ import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOUN
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import eu.stratosphere.compiler.costs.Costs;
import eu.stratosphere.compiler.dag.OptimizerNode;
@@ -93,14 +92,14 @@ public class WorksetPlanNode extends PlanNode {
@Override
- public Iterator<PlanNode> getPredecessors() {
- return Collections.<PlanNode>emptyList().iterator();
+ public Iterable<PlanNode> getPredecessors() {
+ return Collections.<PlanNode>emptyList();
}
@Override
- public Iterator<Channel> getInputs() {
- return Collections.<Channel>emptyList().iterator();
+ public Iterable<Channel> getInputs() {
+ return Collections.<Channel>emptyList();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/DumpableNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/DumpableNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/DumpableNode.java
index 5ff3a8a..56f14ee 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/DumpableNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/DumpableNode.java
@@ -12,8 +12,6 @@
**********************************************************************************************************************/
package eu.stratosphere.compiler.plandump;
-import java.util.Iterator;
-
import eu.stratosphere.compiler.dag.OptimizerNode;
import eu.stratosphere.compiler.plan.PlanNode;
@@ -27,9 +25,9 @@ public interface DumpableNode<T extends DumpableNode<T>> {
*
* @return An iterator over the predecessors.
*/
- Iterator<T> getPredecessors();
+ Iterable<T> getPredecessors();
- Iterator<DumpableConnection<T>> getDumpableInputs();
+ Iterable<DumpableConnection<T>> getDumpableInputs();
OptimizerNode getOptimizerNode();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
index e89a18b..83cb502 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plandump/PlanJSONDumpGenerator.java
@@ -142,8 +142,7 @@ public class PlanJSONDumpGenerator {
this.nodeIds.put(node, this.nodeCnt++);
// then recurse
- for (Iterator<? extends DumpableNode<?>> children = node.getPredecessors(); children.hasNext(); ) {
- final DumpableNode<?> child = children.next();
+ for (DumpableNode<?> child : node.getPredecessors()) {
visit(child, writer, first);
first = false;
}
@@ -254,7 +253,7 @@ public class PlanJSONDumpGenerator {
+ (n.getSubtasksPerInstance() >= 1 ? n.getSubtasksPerInstance() : "default") + "\"");
// output node predecessors
- Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs();
+ Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs().iterator();
String child1name = "", child2name = "";
if (inConns != null && inConns.hasNext()) {
@@ -270,8 +269,8 @@ public class PlanJSONDumpGenerator {
if (conn.getSource() instanceof NAryUnionPlanNode) {
inConnsForInput = new ArrayList<DumpableConnection<?>>();
- for (Iterator<? extends DumpableConnection<?>> inputOfUnion = conn.getSource().getDumpableInputs(); inputOfUnion.hasNext();) {
- inConnsForInput.add(inputOfUnion.next());
+ for (DumpableConnection<?> inputOfUnion : conn.getSource().getDumpableInputs()) {
+ inConnsForInput.add(inputOfUnion);
}
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
index aebf2cf..5a30fb6 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -455,7 +455,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
final TaskInChain chainedTask;
if ((chainedTask = this.chainedTasks.get(node)) != null) {
// Chained Task. Sanity check first...
- final Iterator<Channel> inConns = node.getInputs();
+ final Iterator<Channel> inConns = node.getInputs().iterator();
if (!inConns.hasNext()) {
throw new CompilerException("Bug: Found chained task with no input.");
}
@@ -522,7 +522,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// enclosing iteration node, because the inputs are the initial inputs to the iteration.
final Iterator<Channel> inConns;
if (node instanceof BulkPartialSolutionPlanNode) {
- inConns = ((BulkPartialSolutionPlanNode) node).getContainingIterationNode().getInputs();
+ inConns = ((BulkPartialSolutionPlanNode) node).getContainingIterationNode().getInputs().iterator();
// because the partial solution has its own vertex, is has only one (logical) input.
// note this in the task configuration
targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
@@ -536,7 +536,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
targetVertexConfig.setIterationHeadSolutionSetInputIndex(1);
} else {
- inConns = node.getInputs();
+ inConns = node.getInputs().iterator();
}
if (!inConns.hasNext()) {
throw new CompilerException("Bug: Found a non-source task with no input.");
@@ -578,7 +578,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// check if the iteration's input is a union
if (iterationNode.getInput().getSource() instanceof NAryUnionPlanNode) {
- allInChannels = ((NAryUnionPlanNode) iterationNode.getInput().getSource()).getInputs();
+ allInChannels = ((NAryUnionPlanNode) iterationNode.getInput().getSource()).getInputs().iterator();
} else {
allInChannels = Collections.singletonList(iterationNode.getInput()).iterator();
}
@@ -597,7 +597,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// check if the iteration's input is a union
if (iterationNode.getInput2().getSource() instanceof NAryUnionPlanNode) {
- allInChannels = ((NAryUnionPlanNode) iterationNode.getInput2().getSource()).getInputs();
+ allInChannels = ((NAryUnionPlanNode) iterationNode.getInput2().getSource()).getInputs().iterator();
} else {
allInChannels = Collections.singletonList(iterationNode.getInput2()).iterator();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java
index b39a297..38dd7ec 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/GenericFlatTypePostPass.java
@@ -13,7 +13,6 @@
package eu.stratosphere.compiler.postpass;
-import java.util.Iterator;
import java.util.Map;
import eu.stratosphere.api.common.operators.util.FieldList;
@@ -415,8 +414,8 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im
else if (node instanceof NAryUnionPlanNode) {
// only propagate the info down
try {
- for (Iterator<Channel> channels = node.getInputs(); channels.hasNext(); ) {
- propagateToChannel(parentSchema, channels.next(), createUtilities);
+ for (Channel channel : node.getInputs()) {
+ propagateToChannel(parentSchema, channel, createUtilities);
}
} catch (MissingFieldTypeInfoException ex) {
throw new CompilerPostPassException("Could not set up runtime strategy for the input channel to " +
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/JavaApiPostPass.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/JavaApiPostPass.java
index 635c48d..24fd8e5 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/JavaApiPostPass.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/postpass/JavaApiPostPass.java
@@ -16,7 +16,6 @@ package eu.stratosphere.compiler.postpass;
import java.util.Arrays;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Set;
import eu.stratosphere.api.common.operators.DualInputOperator;
@@ -210,8 +209,8 @@ public class JavaApiPostPass implements OptimizerPostPass {
}
else if (node instanceof NAryUnionPlanNode){
// Traverse to all child channels
- for (Iterator<Channel> channels = node.getInputs(); channels.hasNext(); ) {
- traverseChannel(channels.next());
+ for (Channel channel : node.getInputs()) {
+ traverseChannel(channel);
}
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
index 24d0493..c2b377e 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
@@ -81,7 +81,7 @@ public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();
//get the CoGroup
- DualInputPlanNode dpn = (DualInputPlanNode) deltaNode.getInputs().next().getSource();
+ DualInputPlanNode dpn = (DualInputPlanNode) deltaNode.getInputs().iterator().next().getSource();
Channel in1 = dpn.getInput1();
Channel in2 = dpn.getInput2();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java
index b7b2d5c..bcbb6b1 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/UnionPropertyPropagationTest.java
@@ -83,8 +83,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
@Override
public boolean preVisit(PlanNode visitable) {
if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof ReduceOperator) {
- for (Iterator<Channel> inputs = visitable.getInputs(); inputs.hasNext();) {
- final Channel inConn = inputs.next();
+ for (Channel inConn : visitable.getInputs()) {
Assert.assertTrue("Reduce should just forward the input if it is already partitioned",
inConn.getShipStrategy() == ShipStrategyType.FORWARD);
}
@@ -148,7 +147,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
*/
if (visitable instanceof NAryUnionPlanNode) {
int numberInputs = 0;
- for (Iterator<Channel> inputs = visitable.getInputs(); inputs.hasNext(); numberInputs++) {
+ for (Iterator<Channel> inputs = visitable.getInputs().iterator(); inputs.hasNext(); numberInputs++) {
final Channel inConn = inputs.next();
PlanNode inNode = inConn.getSource();
Assert.assertTrue("Input of Union should be FlatMapOperators",
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ed7c3f15/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java b/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
new file mode 100644
index 0000000..16f610a
--- /dev/null
+++ b/stratosphere-core/src/main/java/eu/stratosphere/util/IterableIterator.java
@@ -0,0 +1,26 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.util;
+
+import java.util.Iterator;
+
+/**
+ * An {@link Iterator] that is also {@link Iterable} (often by returning itself).
+ *
+ * @param <T> The iterated elements' type.
+ */
+public interface IterableIterator<E> extends Iterator<E>, Iterable<E> {
+}