You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:48 UTC
[09/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
new file mode 100644
index 0000000..c2e81a8
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Visitor that computes the interesting properties for each node in the optimizer DAG. On its recursive
+ * depth-first descend, it propagates all interesting properties top-down.
+ */
+public class InterestingPropertyVisitor implements Visitor<OptimizerNode> {
+
+ private CostEstimator estimator; // the cost estimator for maximal costs of an interesting property
+
+ /**
+ * Creates a new visitor that computes the interesting properties for all nodes in the plan.
+ * It uses the given cost estimator used to compute the maximal costs for an interesting property.
+ *
+ * @param estimator
+ * The cost estimator to estimate the maximal costs for interesting properties.
+ */
+ public InterestingPropertyVisitor(CostEstimator estimator) {
+ this.estimator = estimator;
+ }
+
+ @Override
+ public boolean preVisit(OptimizerNode node) {
+ // The interesting properties must be computed on the descend. In case a node has multiple outputs,
+ // that computation must happen during the last descend.
+
+ if (node.getInterestingProperties() == null && node.haveAllOutputConnectionInterestingProperties()) {
+ node.computeUnionOfInterestingPropertiesFromSuccessors();
+ node.computeInterestingPropertiesForInputs(this.estimator);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void postVisit(OptimizerNode visitable) {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
new file mode 100644
index 0000000..58aa3c1
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.IterationPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plan.WorksetPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This visitor traverses the selected execution plan and finalizes it:
+ *
+ * <ul>
+ * <li>The graph of nodes is double-linked (links from child to parent are inserted).</li>
+ * <li>If unions join static and dynamic paths, the cache is marked as a memory consumer.</li>
+ * <li>Relative memory fractions are assigned to all nodes.</li>
+ * <li>All nodes are collected into a set.</li>
+ * </ul>
+ */
+public class PlanFinalizer implements Visitor<PlanNode> {
+
+ private final Set<PlanNode> allNodes; // a set of all nodes in the optimizer plan
+
+ private final List<SourcePlanNode> sources; // all data source nodes in the optimizer plan
+
+ private final List<SinkPlanNode> sinks; // all data sink nodes in the optimizer plan
+
+ private final Deque<IterationPlanNode> stackOfIterationNodes;
+
+ private int memoryConsumerWeights; // a counter of all memory consumers
+
+ /**
+ * Creates a new plan finalizer.
+ */
+ public PlanFinalizer() {
+ this.allNodes = new HashSet<PlanNode>();
+ this.sources = new ArrayList<SourcePlanNode>();
+ this.sinks = new ArrayList<SinkPlanNode>();
+ this.stackOfIterationNodes = new ArrayDeque<IterationPlanNode>();
+ }
+
+ public OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan) {
+ this.memoryConsumerWeights = 0;
+
+ // traverse the graph
+ for (SinkPlanNode node : sinks) {
+ node.accept(this);
+ }
+
+ // assign the memory to each node
+ if (this.memoryConsumerWeights > 0) {
+ for (PlanNode node : this.allNodes) {
+ // assign memory to the driver strategy of the node
+ final int consumerWeight = node.getMemoryConsumerWeight();
+ if (consumerWeight > 0) {
+ final double relativeMem = (double)consumerWeight / this.memoryConsumerWeights;
+ node.setRelativeMemoryPerSubtask(relativeMem);
+ if (Optimizer.LOG.isDebugEnabled()) {
+ Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each subtask of " +
+ node.getProgramOperator().getName() + ".");
+ }
+ }
+
+ // assign memory to the local and global strategies of the channels
+ for (Channel c : node.getInputs()) {
+ if (c.getLocalStrategy().dams()) {
+ final double relativeMem = 1.0 / this.memoryConsumerWeights;
+ c.setRelativeMemoryLocalStrategy(relativeMem);
+ if (Optimizer.LOG.isDebugEnabled()) {
+ Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each local strategy " +
+ "instance of " + c + ".");
+ }
+ }
+ if (c.getTempMode() != TempMode.NONE) {
+ final double relativeMem = 1.0/ this.memoryConsumerWeights;
+ c.setRelativeTempMemory(relativeMem);
+ if (Optimizer.LOG.isDebugEnabled()) {
+ Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " +
+ "table for " + c + ".");
+ }
+ }
+ }
+ }
+ }
+ return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName, originalPlan);
+ }
+
+ @Override
+ public boolean preVisit(PlanNode visitable) {
+ // if we come here again, prevent a further descend
+ if (!this.allNodes.add(visitable)) {
+ return false;
+ }
+
+ if (visitable instanceof SinkPlanNode) {
+ this.sinks.add((SinkPlanNode) visitable);
+ }
+ else if (visitable instanceof SourcePlanNode) {
+ this.sources.add((SourcePlanNode) visitable);
+ }
+ else if (visitable instanceof BinaryUnionPlanNode) {
+ BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
+ if (unionNode.unionsStaticAndDynamicPath()) {
+ unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED);
+ }
+ }
+ else if (visitable instanceof BulkPartialSolutionPlanNode) {
+ // tell the partial solution about the iteration node that contains it
+ final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable;
+ final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
+
+ // sanity check!
+ if (iteration == null || !(iteration instanceof BulkIterationPlanNode)) {
+ throw new CompilerException("Bug: Error finalizing the plan. " +
+ "Cannot associate the node for a partial solutions with its containing iteration.");
+ }
+ pspn.setContainingIterationNode((BulkIterationPlanNode) iteration);
+ }
+ else if (visitable instanceof WorksetPlanNode) {
+ // tell the partial solution about the iteration node that contains it
+ final WorksetPlanNode wspn = (WorksetPlanNode) visitable;
+ final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
+
+ // sanity check!
+ if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
+ throw new CompilerException("Bug: Error finalizing the plan. " +
+ "Cannot associate the node for a partial solutions with its containing iteration.");
+ }
+ wspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
+ }
+ else if (visitable instanceof SolutionSetPlanNode) {
+ // tell the partial solution about the iteration node that contains it
+ final SolutionSetPlanNode sspn = (SolutionSetPlanNode) visitable;
+ final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
+
+ // sanity check!
+ if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
+ throw new CompilerException("Bug: Error finalizing the plan. " +
+ "Cannot associate the node for a partial solutions with its containing iteration.");
+ }
+ sspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
+ }
+
+ // double-connect the connections. previously, only parents knew their children, because
+ // one child candidate could have been referenced by multiple parents.
+ for (Channel conn : visitable.getInputs()) {
+ conn.setTarget(visitable);
+ conn.getSource().addOutgoingChannel(conn);
+ }
+
+ for (Channel c : visitable.getBroadcastInputs()) {
+ c.setTarget(visitable);
+ c.getSource().addOutgoingChannel(c);
+ }
+
+ // count the memory consumption
+ this.memoryConsumerWeights += visitable.getMemoryConsumerWeight();
+ for (Channel c : visitable.getInputs()) {
+ if (c.getLocalStrategy().dams()) {
+ this.memoryConsumerWeights++;
+ }
+ if (c.getTempMode() != TempMode.NONE) {
+ this.memoryConsumerWeights++;
+ }
+ }
+ for (Channel c : visitable.getBroadcastInputs()) {
+ if (c.getLocalStrategy().dams()) {
+ this.memoryConsumerWeights++;
+ }
+ if (c.getTempMode() != TempMode.NONE) {
+ this.memoryConsumerWeights++;
+ }
+ }
+
+ // pass the visitor to the iteraton's step function
+ if (visitable instanceof IterationPlanNode) {
+ // push the iteration node onto the stack
+ final IterationPlanNode iterNode = (IterationPlanNode) visitable;
+ this.stackOfIterationNodes.addLast(iterNode);
+
+ // recurse
+ ((IterationPlanNode) visitable).acceptForStepFunction(this);
+
+ // pop the iteration node from the stack
+ this.stackOfIterationNodes.removeLast();
+ }
+ return true;
+ }
+
+ @Override
+ public void postVisit(PlanNode visitable) {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
new file mode 100644
index 0000000..c0dc4dd
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.IterationNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.util.Visitor;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A traversal that goes over the program data flow of an iteration and makes the nodes
+ * that depend on the partial solution (the data set recomputed in each iteration) as "dynamic"
+ * and the other nodes as "static".
+ */
+public class StaticDynamicPathIdentifier implements Visitor<OptimizerNode> {
+
+ private final Set<OptimizerNode> seenBefore = new HashSet<OptimizerNode>();
+
+ private final int costWeight;
+
+ public StaticDynamicPathIdentifier(int costWeight) {
+ this.costWeight = costWeight;
+ }
+
+ @Override
+ public boolean preVisit(OptimizerNode visitable) {
+ return this.seenBefore.add(visitable);
+ }
+
+ @Override
+ public void postVisit(OptimizerNode visitable) {
+ visitable.identifyDynamicPath(this.costWeight);
+
+ // check that there is no nested iteration on the dynamic path
+ if (visitable.isOnDynamicPath() && visitable instanceof IterationNode) {
+ throw new CompilerException("Nested iterations are currently not supported.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
new file mode 100644
index 0000000..d359490
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.DeltaIterationBase;
+import org.apache.flink.util.Visitor;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A traversal that checks if the Workset of a delta iteration is used in the data flow
+ * of its step function.
+ */
+public class StepFunctionValidator implements Visitor<Operator<?>> {
+
+ private final Set<Operator<?>> seenBefore = new HashSet<Operator<?>>();
+
+ private boolean foundWorkset;
+
+ @Override
+ public boolean preVisit(Operator<?> visitable) {
+ if (visitable instanceof DeltaIterationBase.WorksetPlaceHolder) {
+ foundWorkset = true;
+ }
+
+ return (!foundWorkset) && seenBefore.add(visitable);
+ }
+
+ @Override
+ public void postVisit(Operator<?> visitable) {}
+
+ public boolean hasFoundWorkset() {
+ return foundWorkset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
new file mode 100644
index 0000000..cd8766c
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the various traversals over the program plan and the
+ * optimizer DAG (directed acyclic graph) that are made in the course of
+ * the optimization.
+ *
+ * The traversals are mostly implemented as a {@link org.apache.flink.util.Visitor} that
+ * traversed the program flow.
+ */
+package org.apache.flink.optimizer.traversals;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
new file mode 100644
index 0000000..5110849
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.util;
+
+import java.util.List;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.NoOpFunction;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.DualInputOperator;
+import org.apache.flink.api.common.operators.RecordOperator;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Key;
+
+
+public class NoOpBinaryUdfOp<OUT> extends DualInputOperator<OUT, OUT, OUT, NoOpFunction> implements RecordOperator {
+
+ public NoOpBinaryUdfOp(TypeInformation<OUT> type) {
+ super(new UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), new BinaryOperatorInformation<OUT, OUT, OUT>(type, type, type), "NoContract");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<? extends Key<?>>[] getKeyClasses() {
+ return (Class<? extends Key<?>>[]) new Class[0];
+ }
+
+ @Override
+ protected List<OUT> executeOnCollections(List<OUT> inputData1, List<OUT> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
+ throw new UnsupportedOperationException();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
new file mode 100644
index 0000000..cc4a4d6
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.util;
+
+import java.util.List;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.NoOpFunction;
+import org.apache.flink.api.common.operators.RecordOperator;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Key;
+
+
+public class NoOpUnaryUdfOp<OUT> extends SingleInputOperator<OUT, OUT, NoOpFunction> implements RecordOperator {
+
+ @SuppressWarnings("rawtypes")
+ public static final NoOpUnaryUdfOp INSTANCE = new NoOpUnaryUdfOp();
+
+ private NoOpUnaryUdfOp() {
+ // pass null here because we override getOutputType to return type
+ // of input operator
+ super(new UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), null, "");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<? extends Key<?>>[] getKeyClasses() {
+ return (Class<? extends Key<?>>[]) new Class[0];
+ }
+
+ @Override
+ public UnaryOperatorInformation<OUT, OUT> getOperatorInfo() {
+ TypeInformation<OUT> previousOut = input.getOperatorInfo().getOutputType();
+ return new UnaryOperatorInformation<OUT, OUT>(previousOut, previousOut);
+ }
+
+ @Override
+ protected List<OUT> executeOnCollections(List<OUT> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
+ return inputData;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/Utils.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/Utils.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/Utils.java
new file mode 100644
index 0000000..d8f33a2
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/util/Utils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.util;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.CompilerException;
+
+
+/**
+ *
+ */
+public class Utils
+{
+ public static final FieldList createOrderedFromSet(FieldSet set) {
+ if (set instanceof FieldList) {
+ return (FieldList) set;
+ } else {
+ final int[] cols = set.toArray();
+ Arrays.sort(cols);
+ return new FieldList(cols);
+ }
+ }
+
+ public static final Ordering createOrdering(FieldList fields, boolean[] directions) {
+ final Ordering o = new Ordering();
+ for (int i = 0; i < fields.size(); i++) {
+ o.appendOrdering(fields.get(i), null, directions == null || directions[i] ? Order.ASCENDING : Order.DESCENDING);
+ }
+ return o;
+ }
+
+ public static final Ordering createOrdering(FieldList fields) {
+ final Ordering o = new Ordering();
+ for (int i = 0; i < fields.size(); i++) {
+ o.appendOrdering(fields.get(i), null, Order.ANY);
+ }
+ return o;
+ }
+
+ public static boolean[] getDirections(Ordering o, int numFields) {
+ final boolean[] dirs = o.getFieldSortDirections();
+ if (dirs.length == numFields) {
+ return dirs;
+ } else if (dirs.length > numFields) {
+ final boolean[] subSet = new boolean[numFields];
+ System.arraycopy(dirs, 0, subSet, 0, numFields);
+ return subSet;
+ } else {
+ throw new CompilerException();
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * No instantiation.
+ */
+ private Utils() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
new file mode 100644
index 0000000..1e4bafb
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.record.operators.CrossOperator;
+import org.apache.flink.api.java.record.operators.CrossWithLargeOperator;
+import org.apache.flink.api.java.record.operators.CrossWithSmallOperator;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.util.DummyCrossStub;
+import org.apache.flink.optimizer.util.DummyInputFormat;
+import org.apache.flink.optimizer.util.DummyOutputFormat;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+/**
+* Tests that validate optimizer choices when using operators that are requesting certain specific execution
+* strategies.
+*/
+@SuppressWarnings({"serial", "deprecation"})
+public class AdditionalOperatorsTest extends CompilerTestBase {
+
+ @Test
+ public void testCrossWithSmall() {
+ // construct the plan
+ FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
+ FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
+
+ CrossOperator cross = CrossWithSmallOperator.builder(new DummyCrossStub())
+ .input1(source1).input2(source2)
+ .name("Cross").build();
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
+
+ Plan plan = new Plan(sink);
+ plan.setDefaultParallelism(DEFAULT_PARALLELISM);
+
+
+ try {
+ OptimizedPlan oPlan = compileNoStats(plan);
+ OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
+
+ DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
+ Channel in1 = crossPlanNode.getInput1();
+ Channel in2 = crossPlanNode.getInput2();
+
+ assertEquals(ShipStrategyType.FORWARD, in1.getShipStrategy());
+ assertEquals(ShipStrategyType.BROADCAST, in2.getShipStrategy());
+ } catch(CompilerException ce) {
+ ce.printStackTrace();
+ fail("The pact compiler is unable to compile this plan correctly.");
+ }
+ }
+
+ @Test
+ public void testCrossWithLarge() {
+ // construct the plan
+ FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
+ FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
+
+ CrossOperator cross= CrossWithLargeOperator.builder(new DummyCrossStub())
+ .input1(source1).input2(source2)
+ .name("Cross").build();
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
+
+ Plan plan = new Plan(sink);
+ plan.setDefaultParallelism(DEFAULT_PARALLELISM);
+
+
+ try {
+ OptimizedPlan oPlan = compileNoStats(plan);
+ OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
+
+ DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
+ Channel in1 = crossPlanNode.getInput1();
+ Channel in2 = crossPlanNode.getInput2();
+
+ assertEquals(ShipStrategyType.BROADCAST, in1.getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, in2.getShipStrategy());
+ } catch(CompilerException ce) {
+ ce.printStackTrace();
+ fail("The pact compiler is unable to compile this plan correctly.");
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
new file mode 100644
index 0000000..916aa27
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -0,0 +1,1039 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.record.operators.BulkIteration;
+import org.apache.flink.api.java.record.operators.CoGroupOperator;
+import org.apache.flink.api.java.record.operators.CrossOperator;
+import org.apache.flink.api.java.record.operators.DeltaIteration;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.JoinOperator;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
+import org.apache.flink.optimizer.util.DummyCoGroupStub;
+import org.apache.flink.optimizer.util.DummyCrossStub;
+import org.apache.flink.optimizer.util.DummyInputFormat;
+import org.apache.flink.optimizer.util.DummyMatchStub;
+import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
+import org.apache.flink.optimizer.util.DummyOutputFormat;
+import org.apache.flink.optimizer.util.IdentityMap;
+import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+
+@SuppressWarnings({"serial", "deprecation"})
+public class BranchingPlansCompilerTest extends CompilerTestBase {
+
+
+ @Test
+ public void testCostComputationWithMultipleDataSinks() {
+ final int SINKS = 5;
+
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSet<Long> source = env.generateSequence(1, 10000);
+
+ DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
+ DataSet<Long> mappedC = source.map(new IdentityMapper<Long>());
+
+ for (int sink = 0; sink < SINKS; sink++) {
+ mappedA.output(new DiscardingOutputFormat<Long>());
+ mappedC.output(new DiscardingOutputFormat<Long>());
+ }
+
+ Plan plan = env.createProgramPlan("Plans With Multiple Data Sinks");
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ new JobGraphGenerator().compileJobGraph(oPlan);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+
+ /**
+ *
+ * <pre>
+ * (SRC A)
+ * |
+ * (MAP A)
+ * / \
+ * (MAP B) (MAP C)
+ * / / \
+ * (SINK A) (SINK B) (SINK C)
+ * </pre>
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testBranchingWithMultipleDataSinks2() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSet<Long> source = env.generateSequence(1, 10000);
+
+ DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
+ DataSet<Long> mappedB = mappedA.map(new IdentityMapper<Long>());
+ DataSet<Long> mappedC = mappedA.map(new IdentityMapper<Long>());
+
+ mappedB.output(new DiscardingOutputFormat<Long>());
+ mappedC.output(new DiscardingOutputFormat<Long>());
+ mappedC.output(new DiscardingOutputFormat<Long>());
+
+ Plan plan = env.createProgramPlan();
+ Set<Operator<?>> sinks = new HashSet<Operator<?>>(plan.getDataSinks());
+
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // ---------- check the optimizer plan ----------
+
+ // number of sinks
+ assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size());
+
+ // remove matching sinks to check relation
+ for (SinkPlanNode sink : oPlan.getDataSinks()) {
+ assertTrue(sinks.remove(sink.getProgramOperator()));
+ }
+ assertTrue(sinks.isEmpty());
+
+ new JobGraphGenerator().compileJobGraph(oPlan);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+
+ /**
+ * <pre>
+ * SINK
+ * |
+ * COGROUP
+ * +---/ \----+
+ * / \
+ * / MATCH10
+ * / | \
+ * / | MATCH9
+ * MATCH5 | | \
+ * | \ | | MATCH8
+ * | MATCH4 | | | \
+ * | | \ | | | MATCH7
+ * | | MATCH3 | | | | \
+ * | | | \ | | | | MATCH6
+ * | | | MATCH2 | | | | | |
+ * | | | | \ +--+--+--+--+--+
+ * | | | | MATCH1 MAP
+ * \ | | | | | /-----------/
+ * (DATA SOURCE ONE)
+ * </pre>
+ */
+ @Test
+ public void testBranchingSourceMultipleTimes() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
+ .map(new Duplicator<Long>());
+
+ DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> mapped = source.map(
+ new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+ @Override
+ public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
+ return null;
+ }
+ });
+
+ DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+
+ joined5.coGroup(joined10)
+ .where(1).equalTo(1)
+ .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
+
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+ Plan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileNoStats(plan);
+ new JobGraphGenerator().compileJobGraph(oPlan);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ *
+ * <pre>
+
+ * (SINK A)
+ * | (SINK B) (SINK C)
+ * CROSS / /
+ * / \ | +------+
+ * / \ | /
+ * REDUCE MATCH2
+ * | +---/ \
+ * \ / |
+ * MAP |
+ * | |
+ * COGROUP MATCH1
+ * / \ / \
+ * (SRC A) (SRC B) (SRC C)
+ * </pre>
+ */
+ @Test
+ public void testBranchingWithMultipleDataSinks() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
+ .map(new Duplicator<Long>());
+
+ DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000)
+ .map(new Duplicator<Long>());
+
+ DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000)
+ .map(new Duplicator<Long>());
+
+ DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB)
+ .where(0).equalTo(1)
+ .with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+ @Override
+ public void coGroup(Iterable<Tuple2<Long, Long>> first,
+ Iterable<Tuple2<Long, Long>> second,
+ Collector<Tuple2<Long, Long>> out) {
+ }
+ })
+ .map(new IdentityMapper<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC)
+ .where(0).equalTo(1)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined)
+ .where(1).equalTo(1)
+ .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> reduced = mapped
+ .groupBy(1)
+ .reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>());
+
+ reduced.cross(joined2)
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
+
+ joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+ joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+ Plan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileNoStats(plan);
+ new JobGraphGenerator().compileJobGraph(oPlan);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testBranchEachContractType() {
+ try {
+ // construct the plan
+ FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), "file:///test/file1", "Source A");
+ FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), "file:///test/file2", "Source B");
+ FileDataSource sourceC = new FileDataSource(new DummyInputFormat(), "file:///test/file3", "Source C");
+
+ MapOperator map1 = MapOperator.builder(new IdentityMap()).input(sourceA).name("Map 1").build();
+
+ ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+ .input(map1)
+ .name("Reduce 1")
+ .build();
+
+ JoinOperator match1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
+ .input1(sourceB, sourceB, sourceC)
+ .input2(sourceC)
+ .name("Match 1")
+ .build();
+ ;
+ CoGroupOperator cogroup1 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+ .input1(sourceA)
+ .input2(sourceB)
+ .name("CoGroup 1")
+ .build();
+
+ CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub())
+ .input1(reduce1)
+ .input2(cogroup1)
+ .name("Cross 1")
+ .build();
+
+
+ CoGroupOperator cogroup2 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+ .input1(cross1)
+ .input2(cross1)
+ .name("CoGroup 2")
+ .build();
+
+ CoGroupOperator cogroup3 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+ .input1(map1)
+ .input2(match1)
+ .name("CoGroup 3")
+ .build();
+
+
+ MapOperator map2 = MapOperator.builder(new IdentityMap()).input(cogroup3).name("Map 2").build();
+
+ CoGroupOperator cogroup4 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+ .input1(map2)
+ .input2(match1)
+ .name("CoGroup 4")
+ .build();
+
+ CoGroupOperator cogroup5 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+ .input1(cogroup2)
+ .input2(cogroup1)
+ .name("CoGroup 5")
+ .build();
+
+ CoGroupOperator cogroup6 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+ .input1(reduce1)
+ .input2(cogroup4)
+ .name("CoGroup 6")
+ .build();
+
+ CoGroupOperator cogroup7 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
+ .input1(cogroup5)
+ .input2(cogroup6)
+ .name("CoGroup 7")
+ .build();
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cogroup7);
+ sink.addInput(sourceA);
+ sink.addInput(cogroup3);
+ sink.addInput(cogroup4);
+ sink.addInput(cogroup1);
+
+ // return the PACT plan
+ Plan plan = new Plan(sink, "Branching of each contract type");
+
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ JobGraphGenerator jobGen = new JobGraphGenerator();
+
+ //Compile plan to verify that no error is thrown
+ jobGen.compileJobGraph(oPlan);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+
+ @Test
+ public void testBranchingUnion() {
+ try {
+ // construct the plan
+ FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE);
+ FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE);
+
+ JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
+ .input1(source1)
+ .input2(source2)
+ .name("Match 1")
+ .build();
+
+ MapOperator ma1 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map1").build();
+
+ ReduceOperator r1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+ .input(ma1)
+ .name("Reduce 1")
+ .build();
+
+ ReduceOperator r2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
+ .input(mat1)
+ .name("Reduce 2")
+ .build();
+
+ MapOperator ma2 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map 2").build();
+
+ MapOperator ma3 = MapOperator.builder(new IdentityMap()).input(ma2).name("Map 3").build();
+
+ @SuppressWarnings("unchecked")
+ JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
+ .input1(r1, r2, ma2, ma3)
+ .input2(ma2)
+ .name("Match 2")
+ .build();
+ mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_MERGE);
+
+ FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat2);
+
+
+ // return the PACT plan
+ Plan plan = new Plan(sink, "Branching Union");
+
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ JobGraphGenerator jobGen = new JobGraphGenerator();
+
+ //Compile plan to verify that no error is thrown
+ jobGen.compileJobGraph(oPlan);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ /**
+ *
+ * <pre>
+ * (SRC A)
+ * / \
+ * (SINK A) (SINK B)
+ * </pre>
+ */
+ @Test
+ public void testBranchingWithMultipleDataSinksSmall() {
+ try {
+ // construct the plan
+ final String out1Path = "file:///test/1";
+ final String out2Path = "file:///test/2";
+
+ FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
+
+ FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA);
+ FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, sourceA);
+
+ List<FileDataSink> sinks = new ArrayList<FileDataSink>();
+ sinks.add(sinkA);
+ sinks.add(sinkB);
+
+ // return the PACT plan
+ Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks");
+
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ // ---------- check the optimizer plan ----------
+
+ // number of sinks
+ Assert.assertEquals("Wrong number of data sinks.", 2, oPlan.getDataSinks().size());
+
+ // sinks contain all sink paths
+ Set<String> allSinks = new HashSet<String>();
+ allSinks.add(out1Path);
+ allSinks.add(out2Path);
+
+ for (SinkPlanNode n : oPlan.getDataSinks()) {
+ String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
+ Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
+ }
+
+ // ---------- compile plan to nephele job graph to verify that no error is thrown ----------
+
+ JobGraphGenerator jobGen = new JobGraphGenerator();
+ jobGen.compileJobGraph(oPlan);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ /**
+ *
+ * <pre>
+ * (SINK 3) (SINK 1) (SINK 2) (SINK 4)
+ * \ / \ /
+ * (SRC A) (SRC B)
+ * </pre>
+ *
+ * NOTE: this case is currently not caught by the compiler. we should enable the test once it is caught.
+ */
+ @Test
+ public void testBranchingDisjointPlan() {
+ // construct the plan
+ final String out1Path = "file:///test/1";
+ final String out2Path = "file:///test/2";
+ final String out3Path = "file:///test/3";
+ final String out4Path = "file:///test/4";
+
+ FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
+ FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE);
+
+ FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA, "1");
+ FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, out2Path, sourceB, "2");
+ FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, out3Path, sourceA, "3");
+ FileDataSink sink4 = new FileDataSink(DummyOutputFormat.class, out4Path, sourceB, "4");
+
+
+ List<FileDataSink> sinks = new ArrayList<FileDataSink>();
+ sinks.add(sink1);
+ sinks.add(sink2);
+ sinks.add(sink3);
+ sinks.add(sink4);
+
+ // return the PACT plan
+ Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks and branches");
+ compileNoStats(plan);
+ }
+
+ @Test
+ public void testBranchAfterIteration() {
+ FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
+
+ BulkIteration iteration = new BulkIteration("Loop");
+ iteration.setInput(sourceA);
+ iteration.setMaximumNumberOfIterations(10);
+
+ MapOperator mapper = MapOperator.builder(IdentityMap.class).name("Mapper").input(iteration.getPartialSolution()).build();
+ iteration.setNextPartialSolution(mapper);
+
+ FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 1");
+
+ MapOperator postMap = MapOperator.builder(IdentityMap.class).name("Post Iteration Mapper")
+ .input(iteration).build();
+
+ FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink 2");
+
+ List<FileDataSink> sinks = new ArrayList<FileDataSink>();
+ sinks.add(sink1);
+ sinks.add(sink2);
+
+ Plan plan = new Plan(sinks);
+
+ try {
+ compileNoStats(plan);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBranchBeforeIteration() {
+ FileDataSource source1 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
+ FileDataSource source2 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
+
+ BulkIteration iteration = new BulkIteration("Loop");
+ iteration.setInput(source2);
+ iteration.setMaximumNumberOfIterations(10);
+
+ MapOperator inMap = MapOperator.builder(new IdentityMap())
+ .input(source1)
+ .name("In Iteration Map")
+ .setBroadcastVariable("BC", iteration.getPartialSolution())
+ .build();
+
+ iteration.setNextPartialSolution(inMap);
+
+ MapOperator postMap = MapOperator.builder(new IdentityMap())
+ .input(source1)
+ .name("Post Iteration Map")
+ .setBroadcastVariable("BC", iteration)
+ .build();
+
+ FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink");
+
+ Plan plan = new Plan(sink);
+
+ try {
+ compileNoStats(plan);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Test to ensure that sourceA is inside as well as outside of the iteration the same
+ * node.
+ *
+ * <pre>
+ * (SRC A) (SRC B)
+ * / \ / \
+ * (SINK 1) (ITERATION) | (SINK 2)
+ * / \ /
+ * (SINK 3) (CROSS => NEXT PARTIAL SOLUTION)
+ * </pre>
+ */
+ @Test
+ public void testClosure() {
+ FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
+ FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
+
+ FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
+ FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceB, "Sink 2");
+
+ BulkIteration iteration = new BulkIteration("Loop");
+ iteration.setInput(sourceA);
+ iteration.setMaximumNumberOfIterations(10);
+
+ CrossOperator stepFunction = CrossOperator.builder(DummyCrossStub.class).name("StepFunction").
+ input1(iteration.getPartialSolution()).
+ input2(sourceB).
+ build();
+
+ iteration.setNextPartialSolution(stepFunction);
+
+ FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
+
+ List<FileDataSink> sinks = new ArrayList<FileDataSink>();
+ sinks.add(sink1);
+ sinks.add(sink2);
+ sinks.add(sink3);
+
+ Plan plan = new Plan(sinks);
+
+ try{
+ compileNoStats(plan);
+ }catch(Exception e){
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ /**
+ * <pre>
+ * (SRC A) (SRC B) (SRC C)
+ * / \ / / \
+ * (SINK 1) (DELTA ITERATION) | (SINK 2)
+ * / | \ /
+ * (SINK 3) | (CROSS => NEXT WORKSET)
+ * | |
+ * (JOIN => SOLUTION SET DELTA)
+ * </pre>
+ */
+ @Test
+ public void testClosureDeltaIteration() {
+ FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
+ FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
+ FileDataSource sourceC = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 3");
+
+ FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
+ FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceC, "Sink 2");
+
+ DeltaIteration iteration = new DeltaIteration(0, "Loop");
+ iteration.setInitialSolutionSet(sourceA);
+ iteration.setInitialWorkset(sourceB);
+ iteration.setMaximumNumberOfIterations(10);
+
+ CrossOperator nextWorkset = CrossOperator.builder(DummyCrossStub.class).name("Next workset").
+ input1(iteration.getWorkset()).
+ input2(sourceC).
+ build();
+
+ JoinOperator solutionSetDelta = JoinOperator.builder(DummyMatchStub.class, LongValue.class,0,0).
+ name("Next solution set.").
+ input1(nextWorkset).
+ input2(iteration.getSolutionSet()).
+ build();
+
+ iteration.setNextWorkset(nextWorkset);
+ iteration.setSolutionSetDelta(solutionSetDelta);
+
+ FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
+
+ List<FileDataSink> sinks = new ArrayList<FileDataSink>();
+ sinks.add(sink1);
+ sinks.add(sink2);
+ sinks.add(sink3);
+
+ Plan plan = new Plan(sinks);
+
+ try{
+ compileNoStats(plan);
+ }catch(Exception e){
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ /**
+ * <pre>
+ * +----Iteration-------+
+ * | |
+ * /---------< >---------join-----< >---sink
+ * / (Solution)| / |
+ * / | / |
+ * /--map-------< >----\ / /--|
+ * / (Workset)| \ / / |
+ * src-map | join------/ |
+ * \ | / |
+ * \ +-----/--------------+
+ * \ /
+ * \--reduce-------/
+ * </pre>
+ */
+ @Test
+ public void testDeltaIterationWithStaticInput() {
+ FileDataSource source = new FileDataSource(DummyInputFormat.class, IN_FILE, "source");
+
+ MapOperator mappedSource = MapOperator.builder(IdentityMap.class).
+ input(source).
+ name("Identity mapped source").
+ build();
+
+ ReduceOperator reducedSource = ReduceOperator.builder(IdentityReduce.class).
+ input(source).
+ name("Identity reduce source").
+ build();
+
+ DeltaIteration iteration = new DeltaIteration(0,"Loop");
+ iteration.setMaximumNumberOfIterations(10);
+ iteration.setInitialSolutionSet(source);
+ iteration.setInitialWorkset(mappedSource);
+
+ JoinOperator nextWorkset = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,0).
+ input1(iteration.getWorkset()).
+ input2(reducedSource).
+ name("Next work set").
+ build();
+
+ JoinOperator solutionSetDelta = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,
+ 0).
+ input1(iteration.getSolutionSet()).
+ input2(nextWorkset).
+ name("Solution set delta").
+ build();
+
+ iteration.setNextWorkset(nextWorkset);
+ iteration.setSolutionSetDelta(solutionSetDelta);
+
+ FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Iteration sink");
+ List<FileDataSink> sinks = new ArrayList<FileDataSink>();
+ sinks.add(sink);
+
+ Plan plan = new Plan(sinks);
+
+ try{
+ compileNoStats(plan);
+ }catch(Exception e){
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ /**
+ * <pre>
+ * +---------Iteration-------+
+ * | |
+ * /--map--< >----\ |
+ * / | \ /-------< >---sink
+ * src-map | join------/ |
+ * \ | / |
+ * \ +-----/-------------------+
+ * \ /
+ * \--reduce--/
+ * </pre>
+ */
+ @Test
+ public void testIterationWithStaticInput() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(100);
+
+ DataSet<Long> source = env.generateSequence(1, 1000000);
+
+ DataSet<Long> mapped = source.map(new IdentityMapper<Long>());
+
+ DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());
+
+ IterativeDataSet<Long> iteration = mapped.iterate(10);
+ iteration.closeWith(
+ iteration.join(reduced)
+ .where(new IdentityKeyExtractor<Long>())
+ .equalTo(new IdentityKeyExtractor<Long>())
+ .with(new DummyFlatJoinFunction<Long>()))
+ .output(new DiscardingOutputFormat<Long>());
+
+ compileNoStats(env.createProgramPlan());
+ }
+ catch(Exception e){
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBranchingBroadcastVariable() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(100);
+
+ DataSet<String> input1 = env.readTextFile(IN_FILE).name("source1");
+ DataSet<String> input2 = env.readTextFile(IN_FILE).name("source2");
+ DataSet<String> input3 = env.readTextFile(IN_FILE).name("source3");
+
+ DataSet<String> result1 = input1
+ .map(new IdentityMapper<String>())
+ .reduceGroup(new Top1GroupReducer<String>())
+ .withBroadcastSet(input3, "bc");
+
+ DataSet<String> result2 = input2
+ .map(new IdentityMapper<String>())
+ .reduceGroup(new Top1GroupReducer<String>())
+ .withBroadcastSet(input3, "bc");
+
+ result1.join(result2)
+ .where(new IdentityKeyExtractor<String>())
+ .equalTo(new IdentityKeyExtractor<String>())
+ .with(new RichJoinFunction<String, String, String>() {
+ @Override
+ public String join(String first, String second) {
+ return null;
+ }
+ })
+ .withBroadcastSet(input3, "bc1")
+ .withBroadcastSet(input1, "bc2")
+ .withBroadcastSet(result1, "bc3")
+ .print();
+
+ Plan plan = env.createProgramPlan();
+
+ try{
+ compileNoStats(plan);
+ }catch(Exception e){
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBCVariableClosure() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
+
+ DataSet<String> reduced = input
+ .map(new IdentityMapper<String>())
+ .reduceGroup(new Top1GroupReducer<String>());
+
+
+ DataSet<String> initialSolution = input.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc");
+
+
+ IterativeDataSet<String> iteration = initialSolution.iterate(100);
+
+ iteration.closeWith(iteration.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "red"))
+ .print();
+
+ Plan plan = env.createProgramPlan();
+
+ try{
+ compileNoStats(plan);
+ }catch(Exception e){
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMultipleIterations() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(100);
+
+ DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
+
+ DataSet<String> reduced = input
+ .map(new IdentityMapper<String>())
+ .reduceGroup(new Top1GroupReducer<String>());
+
+ IterativeDataSet<String> iteration1 = input.iterate(100);
+ IterativeDataSet<String> iteration2 = input.iterate(20);
+ IterativeDataSet<String> iteration3 = input.iterate(17);
+
+ iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1")).print();
+ iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2")).print();
+ iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3")).print();
+
+ Plan plan = env.createProgramPlan();
+
+ try{
+ compileNoStats(plan);
+ }catch(Exception e){
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMultipleIterationsWithClosueBCVars() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(100);
+
+ DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
+
+ IterativeDataSet<String> iteration1 = input.iterate(100);
+ IterativeDataSet<String> iteration2 = input.iterate(20);
+ IterativeDataSet<String> iteration3 = input.iterate(17);
+
+
+ iteration1.closeWith(iteration1.map(new IdentityMapper<String>())).print();
+ iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>())).print();
+ iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>())).print();
+
+ Plan plan = env.createProgramPlan();
+
+ try{
+ compileNoStats(plan);
+ }catch(Exception e){
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBranchesOnlyInBCVariables1() {
+ try{
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(100);
+
+ DataSet<Long> input = env.generateSequence(1, 10);
+ DataSet<Long> bc_input = env.generateSequence(1, 10);
+
+ input
+ .map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
+ .map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
+ .print();
+
+ Plan plan = env.createProgramPlan();
+ compileNoStats(plan);
+ }
+ catch(Exception e){
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBranchesOnlyInBCVariables2() {
+ try{
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(100);
+
+ DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
+
+ DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
+ DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
+
+ DataSet<Tuple2<Long, Long>> joinInput1 =
+ input.map(new IdentityMapper<Tuple2<Long,Long>>())
+ .withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
+ .withBroadcastSet(bc_input2, "bc2");
+
+ DataSet<Tuple2<Long, Long>> joinInput2 =
+ input.map(new IdentityMapper<Tuple2<Long,Long>>())
+ .withBroadcastSet(bc_input1, "bc1")
+ .withBroadcastSet(bc_input2, "bc2");
+
+ DataSet<Tuple2<Long, Long>> joinResult = joinInput1
+ .join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
+ .with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
+
+ input
+ .map(new IdentityMapper<Tuple2<Long,Long>>())
+ .withBroadcastSet(bc_input1, "bc1")
+ .union(joinResult)
+ .print();
+
+ Plan plan = env.createProgramPlan();
+ compileNoStats(plan);
+ }
+ catch(Exception e){
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
+
+ @Override
+ public Tuple2<T, T> map(T value) {
+ return new Tuple2<T, T>(value, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
new file mode 100644
index 0000000..c7ad2da
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+
+@SuppressWarnings("serial")
+public class BroadcastVariablePipelinebreakerTest extends CompilerTestBase {
+
+ @Test
+ public void testNoBreakerForIndependentVariable() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> source1 = env.fromElements("test");
+ DataSet<String> source2 = env.fromElements("test");
+
+ source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name").print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+
+ assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBreakerForDependentVariable() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> source1 = env.fromElements("test");
+
+ source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name").print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+
+ assertEquals(TempMode.PIPELINE_BREAKER, mapper.getInput().getTempMode());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
new file mode 100644
index 0000000..3e7da6c
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+* Tests that validate optimizer choice when using hash joins inside of iterations
+*/
+@SuppressWarnings("serial")
+public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
+
+ /**
+ * This tests whether a HYBRIDHASH_BUILD_SECOND is correctly transformed to a HYBRIDHASH_BUILD_SECOND_CACHED
+ * when inside of an iteration an on the static path
+ */
+ @Test
+ public void testRightSide() {
+ try {
+
+ Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
+ DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
+
+ // verify correct join strategy
+ assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy());
+ assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
+ assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
+
+ new JobGraphGenerator().compileJobGraph(oPlan);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test errored: " + e.getMessage());
+ }
+ }
+
+ /**
+ * This test makes sure that only a HYBRIDHASH on the static path is transformed to the cached variant
+ */
+ @Test
+ public void testRightSideCountercheck() {
+ try {
+
+ Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
+
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
+ DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
+
+ // verify correct join strategy
+ assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, innerJoin.getDriverStrategy());
+ assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
+ assertEquals(TempMode.CACHED, innerJoin.getInput2().getTempMode());
+
+ new JobGraphGenerator().compileJobGraph(oPlan);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test errored: " + e.getMessage());
+ }
+ }
+
+ /**
+ * This tests whether a HYBRIDHASH_BUILD_FIRST is correctly transformed to a HYBRIDHASH_BUILD_FIRST_CACHED
+ * when inside of an iteration an on the static path
+ */
+ @Test
+ public void testLeftSide() {
+ try {
+
+ Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
+
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
+ DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
+
+ // verify correct join strategy
+ assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, innerJoin.getDriverStrategy());
+ assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
+ assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
+
+ new JobGraphGenerator().compileJobGraph(oPlan);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test errored: " + e.getMessage());
+ }
+ }
+
+ /**
+ * This test makes sure that only a HYBRIDHASH on the static path is transformed to the cached variant
+ */
+ @Test
+ public void testLeftSideCountercheck() {
+ try {
+
+ Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
+ DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
+
+ // verify correct join strategy
+ assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, innerJoin.getDriverStrategy());
+ assertEquals(TempMode.CACHED, innerJoin.getInput1().getTempMode());
+ assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
+
+ new JobGraphGenerator().compileJobGraph(oPlan);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test errored: " + e.getMessage());
+ }
+ }
+
+ /**
+ * This test simulates a join of a big left side with a small right side inside of an iteration, where the small side is on a static path.
+ * Currently the best execution plan is a HYBRIDHASH_BUILD_SECOND_CACHED, where the small side is hashed and cached.
+ * This test also makes sure that all relevant plans are correctly enumerated by the optimizer.
+ */
+ @Test
+ public void testCorrectChoosing() {
+ try {
+
+ Plan plan = getTestPlanRightStatic("");
+
+ SourceCollectorVisitor sourceCollector = new SourceCollectorVisitor();
+ plan.accept(sourceCollector);
+
+ for(GenericDataSourceBase<?, ?> s : sourceCollector.getSources()) {
+ if(s.getName().equals("bigFile")) {
+ this.setSourceStatistics(s, 10000000, 1000);
+ }
+ else if(s.getName().equals("smallFile")) {
+ this.setSourceStatistics(s, 100, 100);
+ }
+ }
+
+
+ OptimizedPlan oPlan = compileNoStats(plan);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
+ DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
+
+ // verify correct join strategy
+ assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy());
+ assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
+ assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
+
+ new JobGraphGenerator().compileJobGraph(oPlan);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test errored: " + e.getMessage());
+ }
+ }
+
+ private Plan getTestPlanRightStatic(String strategy) {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ DataSet<Tuple3<Long, Long, Long>> bigInput = env.readCsvFile("file://bigFile").types(Long.class, Long.class, Long.class).name("bigFile");
+
+ DataSet<Tuple3<Long, Long, Long>> smallInput = env.readCsvFile("file://smallFile").types(Long.class, Long.class, Long.class).name("smallFile");
+
+ IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
+
+ Configuration joinStrategy = new Configuration();
+ joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+
+ if(strategy != "") {
+ joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
+ }
+
+ DataSet<Tuple3<Long, Long, Long>> inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
+
+ DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
+
+ output.print();
+
+ return env.createProgramPlan();
+
+ }
+
+ private Plan getTestPlanLeftStatic(String strategy) {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple3<Long, Long, Long>> bigInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L),
+ new Tuple3<Long, Long, Long>(1L, 2L, 3L),new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Big");
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple3<Long, Long, Long>> smallInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Small");
+
+ IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
+
+ Configuration joinStrategy = new Configuration();
+ joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
+
+ DataSet<Tuple3<Long, Long, Long>> inner = smallInput.join(iteration).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
+
+ DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
+
+ output.print();
+
+ return env.createProgramPlan();
+
+ }
+
+ private static class DummyJoiner extends RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> {
+
+ @Override
+ public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first,
+ Tuple3<Long, Long, Long> second) throws Exception {
+
+ return first;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
new file mode 100644
index 0000000..eba07f1
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Visitor;
+
+@SuppressWarnings("serial")
+public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
+
+ public static class SimpleCGroup extends RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
+ @Override
+ public void coGroup(Iterable<Tuple1<Integer>> first, Iterable<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) {}
+ }
+
+ public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+ @Override
+ public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
+ return null;
+ }
+ }
+
+ @Test
+ public void testCoGroupSolutionSet() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple1<Integer>> raw = env.readCsvFile(IN_FILE).types(Integer.class);
+
+ DeltaIteration<Tuple1<Integer>, Tuple1<Integer>> iteration = raw.iterateDelta(raw, 1000, 0);
+
+ DataSet<Tuple1<Integer>> test = iteration.getWorkset().map(new SimpleMap());
+ DataSet<Tuple1<Integer>> delta = iteration.getSolutionSet().coGroup(test).where(0).equalTo(0).with(new SimpleCGroup());
+ DataSet<Tuple1<Integer>> feedback = iteration.getWorkset().map(new SimpleMap());
+ DataSet<Tuple1<Integer>> result = iteration.closeWith(delta, feedback);
+
+ result.print();
+
+ Plan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = null;
+ try {
+ oPlan = compileNoStats(plan);
+ } catch(CompilerException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ oPlan.accept(new Visitor<PlanNode>() {
+ @Override
+ public boolean preVisit(PlanNode visitable) {
+ if (visitable instanceof WorksetIterationPlanNode) {
+ PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();
+
+ //get the CoGroup
+ DualInputPlanNode dpn = (DualInputPlanNode) deltaNode.getInputs().iterator().next().getSource();
+ Channel in1 = dpn.getInput1();
+ Channel in2 = dpn.getInput2();
+
+ Assert.assertTrue(in1.getLocalProperties().getOrdering() == null);
+ Assert.assertTrue(in2.getLocalProperties().getOrdering() != null);
+ Assert.assertTrue(in2.getLocalProperties().getOrdering().getInvolvedIndexes().contains(0));
+ Assert.assertTrue(in1.getShipStrategy() == ShipStrategyType.FORWARD);
+ Assert.assertTrue(in2.getShipStrategy() == ShipStrategyType.PARTITION_HASH);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void postVisit(PlanNode visitable) {}
+ });
+ }
+}