You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:52 UTC
[13/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
new file mode 100644
index 0000000..df05b64
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DamBehavior;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Plan candidate node for partial solution of a bulk iteration.
+ */
+public class BulkPartialSolutionPlanNode extends PlanNode {
+
+ private static final Costs NO_COSTS = new Costs();
+
+ private BulkIterationPlanNode containingIterationNode;
+
+ private Channel initialInput;
+
+ public Object postPassHelper;
+
+
+ public BulkPartialSolutionPlanNode(BulkPartialSolutionNode template, String nodeName,
+ GlobalProperties gProps, LocalProperties lProps,
+ Channel initialInput)
+ {
+ super(template, nodeName, DriverStrategy.NONE);
+
+ this.globalProps = gProps;
+ this.localProps = lProps;
+ this.initialInput = initialInput;
+
+ // the partial solution does not cost anything
+ this.nodeCosts = NO_COSTS;
+ this.cumulativeCosts = NO_COSTS;
+
+ if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) {
+ if (this.branchPlan == null) {
+ this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
+ }
+
+ this.branchPlan.putAll(initialInput.getSource().branchPlan);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public BulkPartialSolutionNode getPartialSolutionNode() {
+ return (BulkPartialSolutionNode) this.template;
+ }
+
+ public BulkIterationPlanNode getContainingIterationNode() {
+ return this.containingIterationNode;
+ }
+
+ public void setContainingIterationNode(BulkIterationPlanNode containingIterationNode) {
+ this.containingIterationNode = containingIterationNode;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void accept(Visitor<PlanNode> visitor) {
+ if (visitor.preVisit(this)) {
+ visitor.postVisit(this);
+ }
+ }
+
+ @Override
+ public Iterable<PlanNode> getPredecessors() {
+ return Collections.<PlanNode>emptyList();
+ }
+
+ @Override
+ public Iterable<Channel> getInputs() {
+ return Collections.<Channel>emptyList();
+ }
+
+ @Override
+ public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+ if (source == this) {
+ return FOUND_SOURCE;
+ }
+ SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source);
+ if (res == FOUND_SOURCE_AND_DAM) {
+ return FOUND_SOURCE_AND_DAM;
+ }
+ else if (res == FOUND_SOURCE) {
+ return (this.initialInput.getLocalStrategy().dams() ||
+ this.initialInput.getTempMode().breaksPipeline() ||
+ getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ?
+ FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
+ }
+ else {
+ return NOT_FOUND;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
new file mode 100644
index 0000000..875d1c3
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.EstimateProvider;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plandump.DumpableConnection;
+import org.apache.flink.optimizer.util.Utils;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+
+/**
+ * A Channel represents the result produced by an operator and the data exchange
+ * before the consumption by the target operator.
+ *
+ * The channel defines and tracks various properties and characteristics of the
+ * data set and data exchange.
+ *
+ * Data set characteristics:
+ * <ul>
+ * <li>The "global properties" of the data, i.e., how the data is distributed across
+ * partitions</li>
+ * <li>The "required global properties" of the data, i.e., the global properties that, if absent,
+ * would cause the program to return a wrong result.</li>
+ * <li>The "local properties" of the data, i.e., how the data is organized within a partition</li>
+ * <li>The "required local properties" of the data, i.e., the local properties that, if absent,
+ * would cause the program to return a wrong result.</li>
+ * </ul>
+ *
+ * Data exchange parameters:
+ * <ul>
+ * <li>The "ship strategy", i.e., whether to forward the data, shuffle it, broadcast it, ...</li>
+ * <li>The "ship keys", which are the positions of the key fields in the exchanged records.</li>
+ * <li>The "data exchange mode", which defines whether to pipeline or batch the exchange</li>
+ * <li>Several more...</li>
+ * </ul>
+ */
+public class Channel implements EstimateProvider, Cloneable, DumpableConnection<PlanNode> {
+
+ private PlanNode source;
+
+ private PlanNode target;
+
+ private ShipStrategyType shipStrategy = ShipStrategyType.NONE;
+
+ private DataExchangeMode dataExchangeMode;
+
+ private LocalStrategy localStrategy = LocalStrategy.NONE;
+
+ private FieldList shipKeys;
+
+ private FieldList localKeys;
+
+ private boolean[] shipSortOrder;
+
+ private boolean[] localSortOrder;
+
+ private RequestedGlobalProperties requiredGlobalProps;
+
+ private RequestedLocalProperties requiredLocalProps;
+
+ private GlobalProperties globalProps;
+
+ private LocalProperties localProps;
+
+ private TypeSerializerFactory<?> serializer;
+
+ private TypeComparatorFactory<?> shipStrategyComparator;
+
+ private TypeComparatorFactory<?> localStrategyComparator;
+
+ private DataDistribution dataDistribution;
+
+ private Partitioner<?> partitioner;
+
+ private TempMode tempMode;
+
+ private double relativeTempMemory;
+
+ private double relativeMemoryLocalStrategy;
+
+ private int replicationFactor = 1;
+
+ // --------------------------------------------------------------------------------------------
+
+ public Channel(PlanNode sourceNode) {
+ this(sourceNode, null);
+ }
+
+ public Channel(PlanNode sourceNode, TempMode tempMode) {
+ this.source = sourceNode;
+ this.tempMode = (tempMode == null ? TempMode.NONE : tempMode);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Accessors
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Gets the source of this Channel.
+ *
+ * @return The source.
+ */
+ @Override
+ public PlanNode getSource() {
+ return this.source;
+ }
+
+ /**
+ * Sets the target of this Channel.
+ *
+ * @param target The target.
+ */
+ public void setTarget(PlanNode target) {
+ this.target = target;
+ }
+
+ /**
+ * Gets the target of this Channel.
+ *
+ * @return The target.
+ */
+ public PlanNode getTarget() {
+ return this.target;
+ }
+
+ public void setShipStrategy(ShipStrategyType strategy, DataExchangeMode dataExchangeMode) {
+ setShipStrategy(strategy, null, null, null, dataExchangeMode);
+ }
+
+ public void setShipStrategy(ShipStrategyType strategy, FieldList keys, DataExchangeMode dataExchangeMode) {
+ setShipStrategy(strategy, keys, null, null, dataExchangeMode);
+ }
+
+ public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
+ boolean[] sortDirection, DataExchangeMode dataExchangeMode) {
+ setShipStrategy(strategy, keys, sortDirection, null, dataExchangeMode);
+ }
+
+ public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
+ Partitioner<?> partitioner, DataExchangeMode dataExchangeMode) {
+ setShipStrategy(strategy, keys, null, partitioner, dataExchangeMode);
+ }
+
+ public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
+ boolean[] sortDirection, Partitioner<?> partitioner,
+ DataExchangeMode dataExchangeMode) {
+ this.shipStrategy = strategy;
+ this.shipKeys = keys;
+ this.shipSortOrder = sortDirection;
+ this.partitioner = partitioner;
+ this.dataExchangeMode = dataExchangeMode;
+ this.globalProps = null; // reset the global properties
+ }
+
+ /**
+ * Gets the data exchange mode (batch / streaming) to use for the data
+ * exchange of this channel.
+ *
+ * @return The data exchange mode of this channel.
+ */
+ public DataExchangeMode getDataExchangeMode() {
+ return dataExchangeMode;
+ }
+
+ public ShipStrategyType getShipStrategy() {
+ return this.shipStrategy;
+ }
+
+ public FieldList getShipStrategyKeys() {
+ return this.shipKeys;
+ }
+
+ public boolean[] getShipStrategySortOrder() {
+ return this.shipSortOrder;
+ }
+
+ public void setLocalStrategy(LocalStrategy strategy) {
+ setLocalStrategy(strategy, null, null);
+ }
+
+ public void setLocalStrategy(LocalStrategy strategy, FieldList keys, boolean[] sortDirection) {
+ this.localStrategy = strategy;
+ this.localKeys = keys;
+ this.localSortOrder = sortDirection;
+ this.localProps = null; // reset the local properties
+ }
+
+ public LocalStrategy getLocalStrategy() {
+ return this.localStrategy;
+ }
+
+ public FieldList getLocalStrategyKeys() {
+ return this.localKeys;
+ }
+
+ public boolean[] getLocalStrategySortOrder() {
+ return this.localSortOrder;
+ }
+
+ public void setDataDistribution(DataDistribution dataDistribution) {
+ this.dataDistribution = dataDistribution;
+ }
+
+ public DataDistribution getDataDistribution() {
+ return this.dataDistribution;
+ }
+
+ public Partitioner<?> getPartitioner() {
+ return partitioner;
+ }
+
+ public TempMode getTempMode() {
+ return this.tempMode;
+ }
+
+ /**
+ * Sets the temp mode of the connection.
+ *
+ * @param tempMode
+ * The temp mode of the connection.
+ */
+ public void setTempMode(TempMode tempMode) {
+ this.tempMode = tempMode;
+ }
+
+ /**
+ * Gets the memory for materializing the channel's result from this Channel.
+ *
+ * @return The temp memory.
+ */
+ public double getRelativeTempMemory() {
+ return this.relativeTempMemory;
+ }
+
+ /**
+ * Sets the memory for materializing the channel's result from this Channel.
+ *
+ * @param relativeTempMemory The memory for materialization.
+ */
+ public void setRelativeTempMemory(double relativeTempMemory) {
+ this.relativeTempMemory = relativeTempMemory;
+ }
+
+ /**
+ * Sets the replication factor of the connection.
+ *
+ * @param factor The replication factor of the connection.
+ */
+ public void setReplicationFactor(int factor) {
+ this.replicationFactor = factor;
+ }
+
+ /**
+ * Returns the replication factor of the connection.
+ *
+ * @return The replication factor of the connection.
+ */
+ public int getReplicationFactor() {
+ return this.replicationFactor;
+ }
+
+ /**
+ * Gets the serializer from this Channel.
+ *
+ * @return The serializer.
+ */
+ public TypeSerializerFactory<?> getSerializer() {
+ return serializer;
+ }
+
+ /**
+ * Sets the serializer for this Channel.
+ *
+ * @param serializer The serializer to set.
+ */
+ public void setSerializer(TypeSerializerFactory<?> serializer) {
+ this.serializer = serializer;
+ }
+
+ /**
+ * Gets the ship strategy comparator from this Channel.
+ *
+ * @return The ship strategy comparator.
+ */
+ public TypeComparatorFactory<?> getShipStrategyComparator() {
+ return shipStrategyComparator;
+ }
+
+ /**
+ * Sets the ship strategy comparator for this Channel.
+ *
+ * @param shipStrategyComparator The ship strategy comparator to set.
+ */
+ public void setShipStrategyComparator(TypeComparatorFactory<?> shipStrategyComparator) {
+ this.shipStrategyComparator = shipStrategyComparator;
+ }
+
+ /**
+ * Gets the local strategy comparator from this Channel.
+ *
+ * @return The local strategy comparator.
+ */
+ public TypeComparatorFactory<?> getLocalStrategyComparator() {
+ return localStrategyComparator;
+ }
+
+ /**
+ * Sets the local strategy comparator for this Channel.
+ *
+ * @param localStrategyComparator The local strategy comparator to set.
+ */
+ public void setLocalStrategyComparator(TypeComparatorFactory<?> localStrategyComparator) {
+ this.localStrategyComparator = localStrategyComparator;
+ }
+
+ public double getRelativeMemoryLocalStrategy() {
+ return relativeMemoryLocalStrategy;
+ }
+
+ public void setRelativeMemoryLocalStrategy(double relativeMemoryLocalStrategy) {
+ this.relativeMemoryLocalStrategy = relativeMemoryLocalStrategy;
+ }
+
+ public boolean isOnDynamicPath() {
+ return this.source.isOnDynamicPath();
+ }
+
+ public int getCostWeight() {
+ return this.source.getCostWeight();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Statistic Estimates
+ // --------------------------------------------------------------------------------------------
+
+
+ @Override
+ public long getEstimatedOutputSize() {
+ long estimate = this.source.template.getEstimatedOutputSize();
+ return estimate < 0 ? estimate : estimate * this.replicationFactor;
+ }
+
+ @Override
+ public long getEstimatedNumRecords() {
+ long estimate = this.source.template.getEstimatedNumRecords();
+ return estimate < 0 ? estimate : estimate * this.replicationFactor;
+ }
+
+ @Override
+ public float getEstimatedAvgWidthPerOutputRecord() {
+ return this.source.template.getEstimatedAvgWidthPerOutputRecord();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Data Property Handling
+ // --------------------------------------------------------------------------------------------
+
+
+ public RequestedGlobalProperties getRequiredGlobalProps() {
+ return requiredGlobalProps;
+ }
+
+ public void setRequiredGlobalProps(RequestedGlobalProperties requiredGlobalProps) {
+ this.requiredGlobalProps = requiredGlobalProps;
+ }
+
+ public RequestedLocalProperties getRequiredLocalProps() {
+ return requiredLocalProps;
+ }
+
+ public void setRequiredLocalProps(RequestedLocalProperties requiredLocalProps) {
+ this.requiredLocalProps = requiredLocalProps;
+ }
+
+ public GlobalProperties getGlobalProperties() {
+ if (this.globalProps == null) {
+ this.globalProps = this.source.getGlobalProperties().clone();
+ switch (this.shipStrategy) {
+ case BROADCAST:
+ this.globalProps.clearUniqueFieldCombinations();
+ this.globalProps.setFullyReplicated();
+ break;
+ case PARTITION_HASH:
+ this.globalProps.setHashPartitioned(this.shipKeys);
+ break;
+ case PARTITION_RANGE:
+ this.globalProps.setRangePartitioned(Utils.createOrdering(this.shipKeys, this.shipSortOrder));
+ break;
+ case FORWARD:
+ break;
+ case PARTITION_RANDOM:
+ this.globalProps.reset();
+ break;
+ case PARTITION_FORCED_REBALANCE:
+ this.globalProps.setForcedRebalanced();
+ break;
+ case PARTITION_CUSTOM:
+ this.globalProps.setCustomPartitioned(this.shipKeys, this.partitioner);
+ break;
+ case NONE:
+ throw new CompilerException("Cannot produce GlobalProperties before ship strategy is set.");
+ }
+ }
+
+ return this.globalProps;
+ }
+
+ public LocalProperties getLocalProperties() {
+ if (this.localProps == null) {
+ computeLocalPropertiesAfterShippingOnly();
+ switch (this.localStrategy) {
+ case NONE:
+ break;
+ case SORT:
+ case COMBININGSORT:
+ this.localProps = LocalProperties.forOrdering(Utils.createOrdering(this.localKeys, this.localSortOrder));
+ break;
+ default:
+ throw new CompilerException("Unsupported local strategy for channel.");
+ }
+ }
+
+ return this.localProps;
+ }
+
+ private void computeLocalPropertiesAfterShippingOnly() {
+ switch (this.shipStrategy) {
+ case BROADCAST:
+ case PARTITION_HASH:
+ case PARTITION_CUSTOM:
+ case PARTITION_RANGE:
+ case PARTITION_RANDOM:
+ case PARTITION_FORCED_REBALANCE:
+ this.localProps = new LocalProperties();
+ break;
+ case FORWARD:
+ this.localProps = this.source.getLocalProperties();
+ break;
+ case NONE:
+ throw new CompilerException("ShipStrategy has not yet been set.");
+ default:
+ throw new CompilerException("Unknown ShipStrategy.");
+ }
+ }
+
+ public void adjustGlobalPropertiesForFullParallelismChange() {
+ if (this.shipStrategy == null || this.shipStrategy == ShipStrategyType.NONE) {
+ throw new IllegalStateException("Cannot adjust channel for degree of parallelism " +
+ "change before the ship strategy is set.");
+ }
+
+ // make sure the properties are acquired
+ if (this.globalProps == null) {
+ getGlobalProperties();
+ }
+
+ // some strategies globally reestablish properties
+ switch (this.shipStrategy) {
+ case FORWARD:
+ throw new CompilerException("Cannot use FORWARD strategy between operations " +
+ "with different number of parallel instances.");
+ case NONE: // excluded by sanity check. left here for verification check completion
+ case BROADCAST:
+ case PARTITION_HASH:
+ case PARTITION_RANGE:
+ case PARTITION_RANDOM:
+ case PARTITION_FORCED_REBALANCE:
+ case PARTITION_CUSTOM:
+ return;
+ }
+ throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Utility method used while swapping binary union nodes for n-ary union nodes.
+ */
+ public void swapUnionNodes(PlanNode newUnionNode) {
+ if (!(this.source instanceof BinaryUnionPlanNode)) {
+ throw new IllegalStateException();
+ } else {
+ this.source = newUnionNode;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public int getMaxDepth() {
+ return this.source.getOptimizerNode().getMaxDepth() + 1;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "Channel (" + this.source + (this.target == null ? ')' : ") -> (" + this.target + ')') +
+ '[' + this.shipStrategy + "] [" + this.localStrategy + "] " +
+ (this.tempMode == null || this.tempMode == TempMode.NONE ? "{NO-TEMP}" : this.tempMode);
+ }
+
+ @Override
+ public Channel clone() {
+ try {
+ return (Channel) super.clone();
+ } catch (CloneNotSupportedException cnsex) {
+ throw new RuntimeException(cnsex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
new file mode 100644
index 0000000..01c56dd
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.runtime.operators.DamBehavior;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+
+/**
+ *
+ */
+public class DualInputPlanNode extends PlanNode {
+
+ protected final Channel input1;
+ protected final Channel input2;
+
+ protected final FieldList keys1;
+ protected final FieldList keys2;
+
+ protected final boolean[] sortOrders;
+
+ private TypeComparatorFactory<?> comparator1;
+ private TypeComparatorFactory<?> comparator2;
+ private TypePairComparatorFactory<?, ?> pairComparator;
+
+ public Object postPassHelper1;
+ public Object postPassHelper2;
+
+ // --------------------------------------------------------------------------------------------
+
+ public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2, DriverStrategy diverStrategy) {
+ this(template, nodeName, input1, input2, diverStrategy, null, null, null);
+ }
+
+ public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2,
+ DriverStrategy diverStrategy, FieldList driverKeyFields1, FieldList driverKeyFields2)
+ {
+ this(template, nodeName, input1, input2, diverStrategy, driverKeyFields1, driverKeyFields2,
+ SingleInputPlanNode.getTrueArray(driverKeyFields1.size()));
+ }
+
+ public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2, DriverStrategy diverStrategy,
+ FieldList driverKeyFields1, FieldList driverKeyFields2, boolean[] driverSortOrders)
+ {
+ super(template, nodeName, diverStrategy);
+ this.input1 = input1;
+ this.input2 = input2;
+ this.keys1 = driverKeyFields1;
+ this.keys2 = driverKeyFields2;
+ this.sortOrders = driverSortOrders;
+
+ if (this.input1.getShipStrategy() == ShipStrategyType.BROADCAST) {
+ this.input1.setReplicationFactor(getParallelism());
+ }
+ if (this.input2.getShipStrategy() == ShipStrategyType.BROADCAST) {
+ this.input2.setReplicationFactor(getParallelism());
+ }
+
+ mergeBranchPlanMaps(input1.getSource(), input2.getSource());
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public TwoInputNode getTwoInputNode() {
+ if (this.template instanceof TwoInputNode) {
+ return (TwoInputNode) this.template;
+ } else {
+ throw new RuntimeException();
+ }
+ }
+
+ public FieldList getKeysForInput1() {
+ return this.keys1;
+ }
+
+ public FieldList getKeysForInput2() {
+ return this.keys2;
+ }
+
+ public boolean[] getSortOrders() {
+ return this.sortOrders;
+ }
+
+ public TypeComparatorFactory<?> getComparator1() {
+ return this.comparator1;
+ }
+
+ public TypeComparatorFactory<?> getComparator2() {
+ return this.comparator2;
+ }
+
+ public void setComparator1(TypeComparatorFactory<?> comparator) {
+ this.comparator1 = comparator;
+ }
+
+ public void setComparator2(TypeComparatorFactory<?> comparator) {
+ this.comparator2 = comparator;
+ }
+
+ public TypePairComparatorFactory<?, ?> getPairComparator() {
+ return this.pairComparator;
+ }
+
+ public void setPairComparator(TypePairComparatorFactory<?, ?> comparator) {
+ this.pairComparator = comparator;
+ }
+
+ /**
+ * Gets the first input channel to this node.
+ *
+ * @return The first input channel to this node.
+ */
+ public Channel getInput1() {
+ return this.input1;
+ }
+
+ /**
+ * Gets the second input channel to this node.
+ *
+ * @return The second input channel to this node.
+ */
+ public Channel getInput2() {
+ return this.input2;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+
+ @Override
+ public void accept(Visitor<PlanNode> visitor) {
+ if (visitor.preVisit(this)) {
+ this.input1.getSource().accept(visitor);
+ this.input2.getSource().accept(visitor);
+
+ for (Channel broadcastInput : getBroadcastInputs()) {
+ broadcastInput.getSource().accept(visitor);
+ }
+
+ visitor.postVisit(this);
+ }
+ }
+
+
+ @Override
+ public Iterable<PlanNode> getPredecessors() {
+ if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) {
+ return Arrays.asList(this.input1.getSource(), this.input2.getSource());
+ } else {
+ List<PlanNode> preds = new ArrayList<PlanNode>();
+
+ preds.add(input1.getSource());
+ preds.add(input2.getSource());
+
+ for (Channel c : getBroadcastInputs()) {
+ preds.add(c.getSource());
+ }
+
+ return preds;
+ }
+ }
+
+ @Override
+ public Iterable<Channel> getInputs() {
+ return Arrays.asList(this.input1, this.input2);
+ }
+
+
+ @Override
+ public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+ if (source == this) {
+ return FOUND_SOURCE;
+ }
+
+ // check first input
+ SourceAndDamReport res1 = this.input1.getSource().hasDamOnPathDownTo(source);
+ if (res1 == FOUND_SOURCE_AND_DAM) {
+ return FOUND_SOURCE_AND_DAM;
+ }
+ else if (res1 == FOUND_SOURCE) {
+ if (this.input1.getLocalStrategy().dams() || this.input1.getTempMode().breaksPipeline() ||
+ getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) {
+ return FOUND_SOURCE_AND_DAM;
+ } else {
+ return FOUND_SOURCE;
+ }
+ }
+ else {
+ SourceAndDamReport res2 = this.input2.getSource().hasDamOnPathDownTo(source);
+ if (res2 == FOUND_SOURCE_AND_DAM) {
+ return FOUND_SOURCE_AND_DAM;
+ }
+ else if (res2 == FOUND_SOURCE) {
+ if (this.input2.getLocalStrategy().dams() || this.input2.getTempMode().breaksPipeline() ||
+ getDriverStrategy().secondDam() == DamBehavior.FULL_DAM) {
+ return FOUND_SOURCE_AND_DAM;
+ } else {
+ return FOUND_SOURCE;
+ }
+ }
+ else {
+ // NOT_FOUND
+ // check the broadcast inputs
+
+ for (NamedChannel nc : getBroadcastInputs()) {
+ SourceAndDamReport bcRes = nc.getSource().hasDamOnPathDownTo(source);
+ if (bcRes != NOT_FOUND) {
+ // broadcast inputs are always dams
+ return FOUND_SOURCE_AND_DAM;
+ }
+ }
+ return NOT_FOUND;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
new file mode 100644
index 0000000..d146c83
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+/**
+ * A common interface for compiled Flink plans for both batch and streaming
+ * processing programs.
+ *
+ */
+public interface FlinkPlan {
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
new file mode 100644
index 0000000..38f76b2
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.optimizer.dag.IterationNode;
+import org.apache.flink.util.Visitor;
+
+/**
+ *
+ */
+public interface IterationPlanNode {
+
+ void acceptForStepFunction(Visitor<PlanNode> visitor);
+
+ IterationNode getIterationNode();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
new file mode 100644
index 0000000..3650eea
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.BinaryUnionNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.IterableIterator;
+import org.apache.flink.util.Visitor;
+
+/**
+ * A union operation over multiple inputs (2 or more).
+ */
+public class NAryUnionPlanNode extends PlanNode {
+
+ private final List<Channel> inputs;
+
+ /**
+ * @param template
+ */
+ public NAryUnionPlanNode(BinaryUnionNode template, List<Channel> inputs, GlobalProperties gProps,
+ Costs cumulativeCosts)
+ {
+ super(template, "Union", DriverStrategy.NONE);
+
+ this.inputs = inputs;
+ this.globalProps = gProps;
+ this.localProps = new LocalProperties();
+ this.nodeCosts = new Costs();
+ this.cumulativeCosts = cumulativeCosts;
+ }
+
+ @Override
+ public void accept(Visitor<PlanNode> visitor) {
+ visitor.preVisit(this);
+ for (Channel c : this.inputs) {
+ c.getSource().accept(visitor);
+ }
+ visitor.postVisit(this);
+ }
+
+ public List<Channel> getListOfInputs() {
+ return this.inputs;
+ }
+
+ @Override
+ public Iterable<Channel> getInputs() {
+ return Collections.unmodifiableList(this.inputs);
+ }
+
+ @Override
+ public Iterable<PlanNode> getPredecessors() {
+ final Iterator<Channel> channels = this.inputs.iterator();
+ return new IterableIterator<PlanNode>() {
+
+ @Override
+ public boolean hasNext() {
+ return channels.hasNext();
+ }
+
+ @Override
+ public PlanNode next() {
+ return channels.next().getSource();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<PlanNode> iterator() {
+ return this;
+ }
+ };
+ }
+
+ @Override
+ public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+ // this node is used after the plan enumeration. consequently, this will never be invoked here
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
new file mode 100644
index 0000000..da97e61
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.optimizer.dag.TempMode;
+
+public class NamedChannel extends Channel {
+
+ private final String name;
+
+ /**
+ * Initializes NamedChannel.
+ *
+ * @param sourceNode
+ */
+ public NamedChannel(String name, PlanNode sourceNode) {
+ super(sourceNode);
+ this.name = name;
+ }
+
+ public NamedChannel(String name, PlanNode sourceNode, TempMode tempMode) {
+ super(sourceNode, tempMode);
+ this.name = name;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
new file mode 100644
index 0000000..d56be87
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import java.util.Collection;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.util.Visitable;
+import org.apache.flink.util.Visitor;
+
+/**
+ * The execution plan generated by the Optimizer. It contains {@link PlanNode}s
+ * and {@link Channel}s that describe exactly how the program should be executed.
+ * It defines all ship strategies (local pipe, shuffle, broadcast, rebalance), all
+ * operator strategies (sorting-merge join, hash join, sorted grouping, ...),
+ * and the data exchange modes (batched, pipelined).
+ */
+public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode> {
+
+ /** The data sources in the plan. */
+ private final Collection<SourcePlanNode> dataSources;
+
+ /** The data sinks in the plan. */
+ private final Collection<SinkPlanNode> dataSinks;
+
+ /** All nodes in the optimizer plan. */
+ private final Collection<PlanNode> allNodes;
+
+ /** The original program. */
+ private final Plan originalProgram;
+
+ /** Name of the job */
+ private final String jobName;
+
+ /**
+ * Creates a new instance of this optimizer plan container. The plan is given and fully
+ * described by the data sources, sinks and the collection of all nodes.
+ *
+ * @param sources The data sources.
+ * @param sinks The data sinks.
+ * @param allNodes A collection containing all nodes in the plan.
+ * @param jobName The name of the program
+ */
+ public OptimizedPlan(Collection<SourcePlanNode> sources, Collection<SinkPlanNode> sinks,
+ Collection<PlanNode> allNodes, String jobName, Plan programPlan)
+ {
+ this.dataSources = sources;
+ this.dataSinks = sinks;
+ this.allNodes = allNodes;
+ this.jobName = jobName;
+ this.originalProgram = programPlan;
+ }
+
+ /**
+ * Gets the data sources from this OptimizedPlan.
+ *
+ * @return The data sources.
+ */
+ public Collection<SourcePlanNode> getDataSources() {
+ return dataSources;
+ }
+
+ /**
+ * Gets the data sinks from this OptimizedPlan.
+ *
+ * @return The data sinks.
+ */
+ public Collection<SinkPlanNode> getDataSinks() {
+ return dataSinks;
+ }
+
+ /**
+ * Gets all the nodes from this OptimizedPlan.
+ *
+ * @return All nodes.
+ */
+ public Collection<PlanNode> getAllNodes() {
+ return allNodes;
+ }
+
+ /**
+ * Returns the name of the program.
+ *
+ * @return The name of the program.
+ */
+ public String getJobName() {
+ return this.jobName;
+ }
+
+ /**
+ * Gets the original program plan from which this optimized plan was created.
+ *
+ * @return The original program plan.
+ */
+ public Plan getOriginalPactPlan() {
+ return this.originalProgram;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies the given visitor top down to all nodes, starting at the sinks.
+ *
+ * @param visitor
+ * The visitor to apply to the nodes in this plan.
+ * @see org.apache.flink.util.Visitable#accept(org.apache.flink.util.Visitor)
+ */
+ @Override
+ public void accept(Visitor<PlanNode> visitor) {
+ for (SinkPlanNode node : this.dataSinks) {
+ node.accept(visitor);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
new file mode 100644
index 0000000..6f634fb
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.OptimizerNode.UnclosedBranchDescriptor;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plandump.DumpableConnection;
+import org.apache.flink.optimizer.plandump.DumpableNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.util.Visitable;
+
+/**
+ * The representation of a data exchange between to operators. The data exchange can realize a shipping strategy,
+ * which established global properties, and a local strategy, which establishes local properties.
+ * <p>
+ * Because we currently deal only with plans where the operator order is fixed, many properties are equal
+ * among candidates and are determined prior to the enumeration (such as for example constant/dynamic path membership).
+ * Hence, many methods will delegate to the {@code OptimizerNode} that represents the node this candidate was
+ * created for.
+ */
+public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<PlanNode> {
+
+ protected final OptimizerNode template;
+
+ protected final List<Channel> outChannels;
+
+ private List<NamedChannel> broadcastInputs;
+
+ private final String nodeName;
+
+ private DriverStrategy driverStrategy; // The local strategy (sorting / hashing, ...)
+
+ protected LocalProperties localProps; // local properties of the data produced by this node
+
+ protected GlobalProperties globalProps; // global properties of the data produced by this node
+
+ protected Map<OptimizerNode, PlanNode> branchPlan; // the actual plan alternative chosen at a branch point
+
+ protected Costs nodeCosts; // the costs incurred by this node
+
+ protected Costs cumulativeCosts; // the cumulative costs of all operators in the sub-tree
+
+ private double relativeMemoryPerSubTask; // the amount of memory dedicated to each task, in bytes
+
+ private int parallelism;
+
+ private boolean pFlag; // flag for the internal pruning algorithm
+
+ // --------------------------------------------------------------------------------------------
+
+ public PlanNode(OptimizerNode template, String nodeName, DriverStrategy strategy) {
+ this.outChannels = new ArrayList<Channel>(2);
+ this.broadcastInputs = new ArrayList<NamedChannel>();
+ this.template = template;
+ this.nodeName = nodeName;
+ this.driverStrategy = strategy;
+
+ this.parallelism = template.getParallelism();
+
+ // check, if there is branch at this node. if yes, this candidate must be associated with
+ // the branching template node.
+ if (template.isBranching()) {
+ this.branchPlan = new HashMap<OptimizerNode, PlanNode>(6);
+ this.branchPlan.put(template, this);
+ }
+ }
+
+ protected void mergeBranchPlanMaps(PlanNode pred1, PlanNode pred2) {
+ mergeBranchPlanMaps(pred1.branchPlan, pred2.branchPlan);
+ }
+
+ protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode, PlanNode> branchPlan2) {
+ // merge the branchPlan maps according the template's uncloseBranchesStack
+ if (this.template.hasUnclosedBranches()) {
+ if (this.branchPlan == null) {
+ this.branchPlan = new HashMap<OptimizerNode, PlanNode>(8);
+ }
+
+ for (UnclosedBranchDescriptor uc : this.template.getOpenBranches()) {
+ OptimizerNode brancher = uc.getBranchingNode();
+ PlanNode selectedCandidate = null;
+
+ if (branchPlan1 != null) {
+ // predecessor 1 has branching children, see if it got the branch we are looking for
+ selectedCandidate = branchPlan1.get(brancher);
+ }
+
+ if (selectedCandidate == null && branchPlan2 != null) {
+ // predecessor 2 has branching children, see if it got the branch we are looking for
+ selectedCandidate = branchPlan2.get(brancher);
+ }
+
+ // it may be that the branch candidate is only found once the broadcast variables are set
+ if (selectedCandidate != null) {
+ this.branchPlan.put(brancher, selectedCandidate);
+ }
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Accessors
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Gets the node from the optimizer DAG for which this plan candidate node was created.
+ *
+ * @return The optimizer's DAG node.
+ */
+ public OptimizerNode getOriginalOptimizerNode() {
+ return this.template;
+ }
+
+ /**
+ * Gets the program operator that this node represents in the plan.
+ *
+ * @return The program operator this node represents in the plan.
+ */
+ public Operator<?> getProgramOperator() {
+ return this.template.getOperator();
+ }
+
+ /**
+ * Gets the name of the plan node.
+ *
+ * @return The name of the plan node.
+ */
+ public String getNodeName() {
+ return this.nodeName;
+ }
+
+ public int getMemoryConsumerWeight() {
+ return this.driverStrategy.isMaterializing() ? 1 : 0;
+ }
+
+ /**
+ * Gets the memory dedicated to each sub-task for this node.
+ *
+ * @return The memory per task, in bytes.
+ */
+ public double getRelativeMemoryPerSubTask() {
+ return this.relativeMemoryPerSubTask;
+ }
+
+ /**
+ * Sets the memory dedicated to each task for this node.
+ *
+ * @param relativeMemoryPerSubtask The relative memory per sub-task
+ */
+ public void setRelativeMemoryPerSubtask(double relativeMemoryPerSubtask) {
+ this.relativeMemoryPerSubTask = relativeMemoryPerSubtask;
+ }
+
+ /**
+ * Gets the driver strategy from this node. This determines for example for a <i>match</i> Pact whether
+ * to use a merge or a hybrid hash strategy.
+ *
+ * @return The driver strategy.
+ */
+ public DriverStrategy getDriverStrategy() {
+ return this.driverStrategy;
+ }
+
+ /**
+ * Sets the driver strategy for this node. Usually should not be changed.
+ *
+ * @param newDriverStrategy The driver strategy.
+ */
+ public void setDriverStrategy(DriverStrategy newDriverStrategy) {
+ this.driverStrategy = newDriverStrategy;
+ }
+
+ public void initProperties(GlobalProperties globals, LocalProperties locals) {
+ if (this.globalProps != null || this.localProps != null) {
+ throw new IllegalStateException();
+ }
+ this.globalProps = globals;
+ this.localProps = locals;
+ }
+
+ /**
+ * Gets the local properties from this PlanNode.
+ *
+ * @return The local properties.
+ */
+ public LocalProperties getLocalProperties() {
+ return this.localProps;
+ }
+
+ /**
+ * Gets the global properties from this PlanNode.
+ *
+ * @return The global properties.
+ */
+ public GlobalProperties getGlobalProperties() {
+ return this.globalProps;
+ }
+
+ /**
+ * Gets the costs incurred by this node. The costs reflect also the costs incurred by the shipping strategies
+ * of the incoming connections.
+ *
+ * @return The node-costs, or null, if not yet set.
+ */
+ public Costs getNodeCosts() {
+ return this.nodeCosts;
+ }
+
+ /**
+ * Gets the cumulative costs of this nose. The cumulative costs are the sum of the costs
+ * of this node and of all nodes in the subtree below this node.
+ *
+ * @return The cumulative costs, or null, if not yet set.
+ */
+ public Costs getCumulativeCosts() {
+ return this.cumulativeCosts;
+ }
+
+ public Costs getCumulativeCostsShare() {
+ if (this.cumulativeCosts == null) {
+ return null;
+ } else {
+ Costs result = cumulativeCosts.clone();
+ if (this.template.getOutgoingConnections() != null) {
+ int outDegree = this.template.getOutgoingConnections().size();
+ if (outDegree > 0) {
+ result.divideBy(outDegree);
+ }
+ }
+
+ return result;
+ }
+ }
+
+
+ /**
+ * Sets the basic cost for this node to the given value, and sets the cumulative costs
+ * to those costs plus the cost shares of all inputs (regular and broadcast).
+ *
+ * @param nodeCosts The already knows costs for this node
+ * (this cost a produces by a concrete {@code OptimizerNode} subclass.
+ */
+ public void setCosts(Costs nodeCosts) {
+ // set the node costs
+ this.nodeCosts = nodeCosts;
+
+ // the cumulative costs are the node costs plus the costs of all inputs
+ this.cumulativeCosts = nodeCosts.clone();
+
+ // add all the normal inputs
+ for (PlanNode pred : getPredecessors()) {
+
+ Costs parentCosts = pred.getCumulativeCostsShare();
+ if (parentCosts != null) {
+ this.cumulativeCosts.addCosts(parentCosts);
+ } else {
+ throw new CompilerException("Trying to set the costs of an operator before the predecessor costs are computed.");
+ }
+ }
+
+ // add all broadcast variable inputs
+ if (this.broadcastInputs != null) {
+ for (NamedChannel nc : this.broadcastInputs) {
+ Costs bcInputCost = nc.getSource().getCumulativeCostsShare();
+ if (bcInputCost != null) {
+ this.cumulativeCosts.addCosts(bcInputCost);
+ } else {
+ throw new CompilerException("Trying to set the costs of an operator before the broadcast input costs are computed.");
+ }
+ }
+ }
+ }
+
+ public void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ }
+
+ public int getParallelism() {
+ return this.parallelism;
+ }
+
+ public long getGuaranteedAvailableMemory() {
+ return this.template.getMinimalMemoryAcrossAllSubTasks();
+ }
+
+ public Map<OptimizerNode, PlanNode> getBranchPlan() {
+ return branchPlan;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Input, Predecessors, Successors
+ // --------------------------------------------------------------------------------------------
+
+ public abstract Iterable<Channel> getInputs();
+
+ @Override
+ public abstract Iterable<PlanNode> getPredecessors();
+
+ /**
+ * Sets a list of all broadcast inputs attached to this node.
+ */
+ public void setBroadcastInputs(List<NamedChannel> broadcastInputs) {
+ if (broadcastInputs != null) {
+ this.broadcastInputs = broadcastInputs;
+
+ // update the branch map
+ for (NamedChannel nc : broadcastInputs) {
+ PlanNode source = nc.getSource();
+
+ mergeBranchPlanMaps(branchPlan, source.branchPlan);
+ }
+ }
+
+ // do a sanity check that if we are branching, we have now candidates for each branch point
+ if (this.template.hasUnclosedBranches()) {
+ if (this.branchPlan == null) {
+ throw new CompilerException("Branching and rejoining logic did not find a candidate for the branching point.");
+ }
+
+ for (UnclosedBranchDescriptor uc : this.template.getOpenBranches()) {
+ OptimizerNode brancher = uc.getBranchingNode();
+ if (this.branchPlan.get(brancher) == null) {
+ throw new CompilerException("Branching and rejoining logic did not find a candidate for the branching point.");
+ }
+ }
+ }
+ }
+
+ /**
+ * Gets a list of all broadcast inputs attached to this node.
+ */
+ public List<NamedChannel> getBroadcastInputs() {
+ return this.broadcastInputs;
+ }
+
+ /**
+ * Adds a channel to a successor node to this node.
+ *
+ * @param channel The channel to the successor.
+ */
+ public void addOutgoingChannel(Channel channel) {
+ this.outChannels.add(channel);
+ }
+
+ /**
+ * Gets a list of all outgoing channels leading to successors.
+ *
+ * @return A list of all channels leading to successors.
+ */
+ public List<Channel> getOutgoingChannels() {
+ return this.outChannels;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Miscellaneous
+ // --------------------------------------------------------------------------------------------
+
+ public void updatePropertiesWithUniqueSets(Set<FieldSet> uniqueFieldCombinations) {
+ if (uniqueFieldCombinations == null || uniqueFieldCombinations.isEmpty()) {
+ return;
+ }
+ for (FieldSet fields : uniqueFieldCombinations) {
+ this.globalProps.addUniqueFieldCombination(fields);
+ this.localProps = this.localProps.addUniqueFields(fields);
+ }
+ }
+
+ public PlanNode getCandidateAtBranchPoint(OptimizerNode branchPoint) {
+ if (branchPlan == null) {
+ return null;
+ } else {
+ return this.branchPlan.get(branchPoint);
+ }
+ }
+
+ /**
+ * Sets the pruning marker to true.
+ */
+ public void setPruningMarker() {
+ this.pFlag = true;
+ }
+
+ /**
+ * Checks whether the pruning marker was set.
+ *
+ * @return True, if the pruning marker was set, false otherwise.
+ */
+ public boolean isPruneMarkerSet() {
+ return this.pFlag;
+ }
+
+ public boolean isOnDynamicPath() {
+ return this.template.isOnDynamicPath();
+ }
+
+ public int getCostWeight() {
+ return this.template.getCostWeight();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Checks whether this node has a dam on the way down to the given source node. This method
+ * returns either that (a) the source node is not found as a (transitive) child of this node,
+ * (b) the node is found, but no dam is on the path, or (c) the node is found and a dam is on
+ * the path.
+ *
+ * @param source The node on the path to which the dam is sought.
+ * @return The result whether the node is found and whether a dam is on the path.
+ */
+ public abstract SourceAndDamReport hasDamOnPathDownTo(PlanNode source);
+
+ public FeedbackPropertiesMeetRequirementsReport checkPartialSolutionPropertiesMet(PlanNode partialSolution, GlobalProperties feedbackGlobal, LocalProperties feedbackLocal) {
+ if (this == partialSolution) {
+ return FeedbackPropertiesMeetRequirementsReport.PENDING;
+ }
+
+ boolean found = false;
+ boolean allMet = true;
+ boolean allLocallyMet = true;
+
+ for (Channel input : getInputs()) {
+ FeedbackPropertiesMeetRequirementsReport inputState = input.getSource().checkPartialSolutionPropertiesMet(partialSolution, feedbackGlobal, feedbackLocal);
+
+ if (inputState == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+ continue;
+ }
+ else if (inputState == FeedbackPropertiesMeetRequirementsReport.MET) {
+ found = true;
+ continue;
+ }
+ else if (inputState == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+ return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+ }
+ else {
+ found = true;
+
+ // the partial solution was on the path here. check whether the channel requires
+ // certain properties that are met, or whether the channel introduces new properties
+
+ // if the plan introduces new global properties, then we can stop looking whether
+ // the feedback properties are sufficient to meet the requirements
+ if (input.getShipStrategy() != ShipStrategyType.FORWARD && input.getShipStrategy() != ShipStrategyType.NONE) {
+ continue;
+ }
+
+ // first check whether this channel requires something that is not met
+ if (input.getRequiredGlobalProps() != null && !input.getRequiredGlobalProps().isMetBy(feedbackGlobal)) {
+ return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+ }
+
+ // in general, not everything is met here already
+ allMet = false;
+
+ // if the plan introduces new local properties, we can stop checking for matching local properties
+ if (inputState != FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET) {
+
+ if (input.getLocalStrategy() == LocalStrategy.NONE) {
+
+ if (input.getRequiredLocalProps() != null && !input.getRequiredLocalProps().isMetBy(feedbackLocal)) {
+ return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+ }
+
+ allLocallyMet = false;
+ }
+ }
+ }
+ }
+
+ if (!found) {
+ return FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION;
+ } else if (allMet) {
+ return FeedbackPropertiesMeetRequirementsReport.MET;
+ } else if (allLocallyMet) {
+ return FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET;
+ } else {
+ return FeedbackPropertiesMeetRequirementsReport.PENDING;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+
+ @Override
+ public String toString() {
+ return this.template.getName() + " \"" + getProgramOperator().getName() + "\" : " + this.driverStrategy +
+ " [[ " + this.globalProps + " ]] [[ " + this.localProps + " ]]";
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public OptimizerNode getOptimizerNode() {
+ return this.template;
+ }
+
+ @Override
+ public PlanNode getPlanNode() {
+ return this;
+ }
+
+ @Override
+ public Iterable<DumpableConnection<PlanNode>> getDumpableInputs() {
+ List<DumpableConnection<PlanNode>> allInputs = new ArrayList<DumpableConnection<PlanNode>>();
+
+ for (Channel c : getInputs()) {
+ allInputs.add(c);
+ }
+
+ for (NamedChannel c : getBroadcastInputs()) {
+ allInputs.add(c);
+ }
+
+ return allInputs;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static enum SourceAndDamReport {
+ NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM;
+ }
+
+
+
+ public static enum FeedbackPropertiesMeetRequirementsReport {
+ /** Indicates that the path is irrelevant */
+ NO_PARTIAL_SOLUTION,
+
+ /** Indicates that the question whether the properties are met has been determined pending
+ * dependent on global and local properties */
+ PENDING,
+
+ /** Indicates that the question whether the properties are met has been determined pending
+ * dependent on global properties only */
+ PENDING_LOCAL_MET,
+
+ /** Indicates that the question whether the properties are met has been determined true */
+ MET,
+
+ /** Indicates that the question whether the properties are met has been determined false */
+ NOT_MET;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
new file mode 100644
index 0000000..b928be7
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.runtime.operators.DamBehavior;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+
+/**
+ *
+ */
+public class SingleInputPlanNode extends PlanNode {
+
+ protected final Channel input;
+
+ protected final FieldList[] driverKeys;
+
+ protected final boolean[][] driverSortOrders;
+
+ private TypeComparatorFactory<?>[] comparators;
+
+ public Object postPassHelper;
+
+ // --------------------------------------------------------------------------------------------
+
+ public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input, DriverStrategy driverStrategy) {
+ this(template, nodeName, input, driverStrategy, null, null);
+ }
+
+ public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input,
+ DriverStrategy driverStrategy, FieldList driverKeyFields)
+ {
+ this(template, nodeName, input, driverStrategy, driverKeyFields, getTrueArray(driverKeyFields.size()));
+ }
+
+ public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input,
+ DriverStrategy driverStrategy, FieldList driverKeyFields, boolean[] driverSortOrders)
+ {
+ super(template, nodeName, driverStrategy);
+ this.input = input;
+
+ this.comparators = new TypeComparatorFactory<?>[driverStrategy.getNumRequiredComparators()];
+ this.driverKeys = new FieldList[driverStrategy.getNumRequiredComparators()];
+ this.driverSortOrders = new boolean[driverStrategy.getNumRequiredComparators()][];
+
+ if(driverStrategy.getNumRequiredComparators() > 0) {
+ this.driverKeys[0] = driverKeyFields;
+ this.driverSortOrders[0] = driverSortOrders;
+ }
+
+ if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) {
+ this.input.setReplicationFactor(getParallelism());
+ }
+
+ final PlanNode predNode = input.getSource();
+
+ if (predNode.branchPlan != null && !predNode.branchPlan.isEmpty()) {
+
+ if (this.branchPlan == null) {
+ this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
+ }
+ this.branchPlan.putAll(predNode.branchPlan);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public SingleInputNode getSingleInputNode() {
+ if (this.template instanceof SingleInputNode) {
+ return (SingleInputNode) this.template;
+ } else {
+ throw new RuntimeException();
+ }
+ }
+
+ /**
+ * Gets the input channel to this node.
+ *
+ * @return The input channel to this node.
+ */
+ public Channel getInput() {
+ return this.input;
+ }
+
+ /**
+ * Gets the predecessor of this node, i.e. the source of the input channel.
+ *
+ * @return The predecessor of this node.
+ */
+ public PlanNode getPredecessor() {
+ return this.input.getSource();
+ }
+
+ /**
+ * Sets the key field indexes for the specified driver comparator.
+ *
+ * @param keys The key field indexes for the specified driver comparator.
+ * @param id The ID of the driver comparator.
+ */
+ public void setDriverKeyInfo(FieldList keys, int id) {
+ this.setDriverKeyInfo(keys, getTrueArray(keys.size()), id);
+ }
+
+ /**
+ * Sets the key field information for the specified driver comparator.
+ *
+ * @param keys The key field indexes for the specified driver comparator.
+ * @param sortOrder The key sort order for the specified driver comparator.
+ * @param id The ID of the driver comparator.
+ */
+ public void setDriverKeyInfo(FieldList keys, boolean[] sortOrder, int id) {
+ if(id < 0 || id >= driverKeys.length) {
+ throw new CompilerException("Invalid id for driver key information. DriverStrategy requires only "
+ +super.getDriverStrategy().getNumRequiredComparators()+" comparators.");
+ }
+ this.driverKeys[id] = keys;
+ this.driverSortOrders[id] = sortOrder;
+ }
+
+ /**
+ * Gets the key field indexes for the specified driver comparator.
+ *
+ * @param id The id of the driver comparator for which the key field indexes are requested.
+ * @return The key field indexes of the specified driver comparator.
+ */
+ public FieldList getKeys(int id) {
+ return this.driverKeys[id];
+ }
+
+ /**
+ * Gets the sort order for the specified driver comparator.
+ *
+ * @param id The id of the driver comparator for which the sort order is requested.
+ * @return The sort order of the specified driver comparator.
+ */
+ public boolean[] getSortOrders(int id) {
+ return driverSortOrders[id];
+ }
+
+ /**
+ * Gets the specified comparator from this PlanNode.
+ *
+ * @param id The ID of the requested comparator.
+ *
+ * @return The specified comparator.
+ */
+ public TypeComparatorFactory<?> getComparator(int id) {
+ return comparators[id];
+ }
+
+ /**
+ * Sets the specified comparator for this PlanNode.
+ *
+ * @param comparator The comparator to set.
+ * @param id The ID of the comparator to set.
+ */
+ public void setComparator(TypeComparatorFactory<?> comparator, int id) {
+ this.comparators[id] = comparator;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+
+ @Override
+ public void accept(Visitor<PlanNode> visitor) {
+ if (visitor.preVisit(this)) {
+ this.input.getSource().accept(visitor);
+
+ for (Channel broadcastInput : getBroadcastInputs()) {
+ broadcastInput.getSource().accept(visitor);
+ }
+
+ visitor.postVisit(this);
+ }
+ }
+
+
+ @Override
+ public Iterable<PlanNode> getPredecessors() {
+ if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) {
+ return Collections.singleton(this.input.getSource());
+ }
+ else {
+ List<PlanNode> preds = new ArrayList<PlanNode>();
+ preds.add(input.getSource());
+
+ for (Channel c : getBroadcastInputs()) {
+ preds.add(c.getSource());
+ }
+
+ return preds;
+ }
+ }
+
+
+ @Override
+ public Iterable<Channel> getInputs() {
+ return Collections.singleton(this.input);
+ }
+
+ @Override
+ public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+ if (source == this) {
+ return FOUND_SOURCE;
+ }
+ SourceAndDamReport res = this.input.getSource().hasDamOnPathDownTo(source);
+ if (res == FOUND_SOURCE_AND_DAM) {
+ return FOUND_SOURCE_AND_DAM;
+ }
+ else if (res == FOUND_SOURCE) {
+ return (this.input.getLocalStrategy().dams() || this.input.getTempMode().breaksPipeline() ||
+ getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ?
+ FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
+ }
+ else {
+ // NOT_FOUND
+ // check the broadcast inputs
+
+ for (NamedChannel nc : getBroadcastInputs()) {
+ SourceAndDamReport bcRes = nc.getSource().hasDamOnPathDownTo(source);
+ if (bcRes != NOT_FOUND) {
+ // broadcast inputs are always dams
+ return FOUND_SOURCE_AND_DAM;
+ }
+ }
+ return NOT_FOUND;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ protected static boolean[] getTrueArray(int length) {
+ final boolean[] a = new boolean[length];
+ for (int i = 0; i < length; i++) {
+ a[i] = true;
+ }
+ return a;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
new file mode 100644
index 0000000..451484d
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import java.util.List;
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.SinkJoiner;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+/**
+ *
+ */
+public class SinkJoinerPlanNode extends DualInputPlanNode {
+
+ public SinkJoinerPlanNode(SinkJoiner template, Channel input1, Channel input2) {
+ super(template, "", input1, input2, DriverStrategy.BINARY_NO_OP);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public void setCosts(Costs nodeCosts) {
+ // the plan enumeration logic works as for regular two-input-operators, which is important
+ // because of the branch handling logic. it does pick redistributing network channels
+ // between the sink and the sink joiner, because sinks joiner has a different DOP than the sink.
+ // we discard any cost and simply use the sum of the costs from the two children.
+
+ Costs totalCosts = getInput1().getSource().getCumulativeCosts().clone();
+ totalCosts.addCosts(getInput2().getSource().getCumulativeCosts());
+ super.setCosts(totalCosts);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public void getDataSinks(List<SinkPlanNode> sinks) {
+ final PlanNode in1 = this.input1.getSource();
+ final PlanNode in2 = this.input2.getSource();
+
+ if (in1 instanceof SinkPlanNode) {
+ sinks.add((SinkPlanNode) in1);
+ } else if (in1 instanceof SinkJoinerPlanNode) {
+ ((SinkJoinerPlanNode) in1).getDataSinks(sinks);
+ } else {
+ throw new CompilerException("Illegal child node for a sink joiner utility node: Neither Sink nor Sink Joiner");
+ }
+
+ if (in2 instanceof SinkPlanNode) {
+ sinks.add((SinkPlanNode) in2);
+ } else if (in2 instanceof SinkJoinerPlanNode) {
+ ((SinkJoinerPlanNode) in2).getDataSinks(sinks);
+ } else {
+ throw new CompilerException("Illegal child node for a sink joiner utility node: Neither Sink nor Sink Joiner");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
new file mode 100644
index 0000000..656e67f
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ * Plan candidate node for data flow sinks.
+ */
+public class SinkPlanNode extends SingleInputPlanNode
+{
+ /**
+ * Constructs a new sink candidate node that uses <i>NONE</i> as its local strategy. Note that
+ * local sorting and range partitioning are handled by the incoming channel already.
+ *
+ * @param template The template optimizer node that this candidate is created for.
+ */
+ public SinkPlanNode(DataSinkNode template, String nodeName, Channel input) {
+ super(template, nodeName, input, DriverStrategy.NONE);
+
+ this.globalProps = input.getGlobalProperties().clone();
+ this.localProps = input.getLocalProperties().clone();
+ }
+
+ public DataSinkNode getSinkNode() {
+ if (this.template instanceof DataSinkNode) {
+ return (DataSinkNode) this.template;
+ } else {
+ throw new RuntimeException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
new file mode 100644
index 0000000..63093dd
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.SolutionSetNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Plan candidate node for partial solution of a bulk iteration.
+ */
+public class SolutionSetPlanNode extends PlanNode {
+
+ private static final Costs NO_COSTS = new Costs();
+
+ private WorksetIterationPlanNode containingIterationNode;
+
+ private final Channel initialInput;
+
+ public Object postPassHelper;
+
+
+ public SolutionSetPlanNode(SolutionSetNode template, String nodeName,
+ GlobalProperties gProps, LocalProperties lProps,
+ Channel initialInput)
+ {
+ super(template, nodeName, DriverStrategy.NONE);
+
+ this.globalProps = gProps;
+ this.localProps = lProps;
+ this.initialInput = initialInput;
+
+ // the node incurs no cost
+ this.nodeCosts = NO_COSTS;
+ this.cumulativeCosts = NO_COSTS;
+
+ if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) {
+ if (this.branchPlan == null) {
+ this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
+ }
+
+ this.branchPlan.putAll(initialInput.getSource().branchPlan);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public SolutionSetNode getSolutionSetNode() {
+ return (SolutionSetNode) this.template;
+ }
+
+ public WorksetIterationPlanNode getContainingIterationNode() {
+ return this.containingIterationNode;
+ }
+
+ public void setContainingIterationNode(WorksetIterationPlanNode containingIterationNode) {
+ this.containingIterationNode = containingIterationNode;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+
+ @Override
+ public void accept(Visitor<PlanNode> visitor) {
+ if (visitor.preVisit(this)) {
+ visitor.postVisit(this);
+ }
+ }
+
+
+ @Override
+ public Iterable<PlanNode> getPredecessors() {
+ return Collections.<PlanNode>emptyList();
+ }
+
+
+ @Override
+ public Iterable<Channel> getInputs() {
+ return Collections.<Channel>emptyList();
+ }
+
+
+ @Override
+ public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+ if (source == this) {
+ return FOUND_SOURCE_AND_DAM;
+ }
+
+ SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source);
+ if (res == FOUND_SOURCE_AND_DAM || res == FOUND_SOURCE) {
+ return FOUND_SOURCE_AND_DAM;
+ } else {
+ return NOT_FOUND;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
new file mode 100644
index 0000000..11b7cc9
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.Collections;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.dag.DataSourceNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Plan candidate node for data flow sources that have no input and no special strategies.
+ */
+public class SourcePlanNode extends PlanNode {
+
+ private TypeSerializerFactory<?> serializer;
+
+ /**
+ * Constructs a new source candidate node that uses <i>NONE</i> as its local strategy.
+ *
+ * @param template The template optimizer node that this candidate is created for.
+ */
+ public SourcePlanNode(DataSourceNode template, String nodeName) {
+ this(template, nodeName, new GlobalProperties(), new LocalProperties());
+ }
+
+ public SourcePlanNode(DataSourceNode template, String nodeName, GlobalProperties gprops, LocalProperties lprops) {
+ super(template, nodeName, DriverStrategy.NONE);
+
+ this.globalProps = gprops;
+ this.localProps = lprops;
+ updatePropertiesWithUniqueSets(template.getUniqueFields());
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public DataSourceNode getDataSourceNode() {
+ return (DataSourceNode) this.template;
+ }
+
+ /**
+ * Gets the serializer from this PlanNode.
+ *
+ * @return The serializer.
+ */
+ public TypeSerializerFactory<?> getSerializer() {
+ return serializer;
+ }
+
+ /**
+ * Sets the serializer for this PlanNode.
+ *
+ * @param serializer The serializer to set.
+ */
+ public void setSerializer(TypeSerializerFactory<?> serializer) {
+ this.serializer = serializer;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+
+ @Override
+ public void accept(Visitor<PlanNode> visitor) {
+ if (visitor.preVisit(this)) {
+ visitor.postVisit(this);
+ }
+ }
+
+
+ @Override
+ public Iterable<PlanNode> getPredecessors() {
+ return Collections.<PlanNode>emptyList();
+ }
+
+
+ @Override
+ public Iterable<Channel> getInputs() {
+ return Collections.<Channel>emptyList();
+ }
+
+
+ @Override
+ public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+ if (source == this) {
+ return FOUND_SOURCE;
+ } else {
+ return NOT_FOUND;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
new file mode 100644
index 0000000..880f2e3
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * Abstract class representing Flink Streaming plans
+ *
+ */
+public abstract class StreamingPlan implements FlinkPlan {
+
+ public abstract JobGraph getJobGraph();
+
+ public abstract String getStreamingPlanAsJSON();
+
+ public abstract void dumpStreamingPlanAsJSON(File file) throws IOException;
+
+}