You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:16 UTC
[37/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
deleted file mode 100644
index 0cad34e..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
+++ /dev/null
@@ -1,1172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.AbstractUdfOperator;
-import org.apache.flink.api.common.operators.CompilerHints;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plandump.DumpableConnection;
-import org.apache.flink.optimizer.plandump.DumpableNode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Visitable;
-import org.apache.flink.util.Visitor;
-
-/**
- * The OptimizerNode is the base class of all nodes in the optimizer DAG. The optimizer DAG is the
- * optimizer's representation of a program, created before the actual optimization (which creates different
- * candidate plans and computes their cost).
- * <p>>
- * Nodes in the DAG correspond (almost) one-to-one to the operators in a program. The optimizer DAG is constructed
- * to hold the additional information that the optimizer needs:
- * <ul>
- * <li>Estimates of the data size processed by each operator</li>
- * <li>Helper structures to track where the data flow "splits" and "joins", to support flows that are
- * DAGs but not trees.</li>
- * <li>Tags and weights to differentiate between loop-variant and -invariant parts of an iteration</li>
- * <li>Interesting properties to be used during the enumeration of candidate plans</li>
- * </ul>
- */
-public abstract class OptimizerNode implements Visitable<OptimizerNode>, EstimateProvider, DumpableNode<OptimizerNode> {
-
- public static final int MAX_DYNAMIC_PATH_COST_WEIGHT = 100;
-
- // --------------------------------------------------------------------------------------------
- // Members
- // --------------------------------------------------------------------------------------------
-
- private final Operator<?> operator; // The operator (Reduce / Join / DataSource / ...)
-
- private List<String> broadcastConnectionNames = new ArrayList<String>(); // the broadcast inputs names of this node
-
- private List<DagConnection> broadcastConnections = new ArrayList<DagConnection>(); // the broadcast inputs of this node
-
- private List<DagConnection> outgoingConnections; // The links to succeeding nodes
-
- private InterestingProperties intProps; // the interesting properties of this node
-
- // --------------------------------- Branch Handling ------------------------------------------
-
- protected List<UnclosedBranchDescriptor> openBranches; // stack of branches in the sub-graph that are not joined
-
- protected Set<OptimizerNode> closedBranchingNodes; // stack of branching nodes which have already been closed
-
- protected List<OptimizerNode> hereJoinedBranches; // the branching nodes (node with multiple outputs)
- // that are partially joined (through multiple inputs or broadcast vars)
-
- // ---------------------------- Estimates and Annotations -------------------------------------
-
- protected long estimatedOutputSize = -1; // the estimated size of the output (bytes)
-
- protected long estimatedNumRecords = -1; // the estimated number of key/value pairs in the output
-
- protected Set<FieldSet> uniqueFields; // set of attributes that will always be unique after this node
-
- // --------------------------------- General Parameters ---------------------------------------
-
- private int parallelism = -1; // the number of parallel instances of this node
-
- private long minimalMemoryPerSubTask = -1;
-
- protected int id = -1; // the id for this node.
-
- protected int costWeight = 1; // factor to weight the costs for dynamic paths
-
- protected boolean onDynamicPath;
-
- protected List<PlanNode> cachedPlans; // cache candidates, because the may be accessed repeatedly
-
- // ------------------------------------------------------------------------
- // Constructor / Setup
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new optimizer node that represents the given program operator.
- *
- * @param op The operator that the node represents.
- */
- public OptimizerNode(Operator<?> op) {
- this.operator = op;
- readStubAnnotations();
- }
-
- protected OptimizerNode(OptimizerNode toCopy) {
- this.operator = toCopy.operator;
- this.intProps = toCopy.intProps;
-
- this.openBranches = toCopy.openBranches;
- this.closedBranchingNodes = toCopy.closedBranchingNodes;
-
- this.estimatedOutputSize = toCopy.estimatedOutputSize;
- this.estimatedNumRecords = toCopy.estimatedNumRecords;
-
- this.parallelism = toCopy.parallelism;
- this.minimalMemoryPerSubTask = toCopy.minimalMemoryPerSubTask;
-
- this.id = toCopy.id;
- this.costWeight = toCopy.costWeight;
- this.onDynamicPath = toCopy.onDynamicPath;
- }
-
- // ------------------------------------------------------------------------
- // Methods specific to unary- / binary- / special nodes
- // ------------------------------------------------------------------------
-
- /**
- * Gets the name of this node, which is the name of the function/operator, or
- * data source / data sink.
- *
- * @return The node name.
- */
- public abstract String getName();
-
- /**
- * This function connects the predecessors to this operator.
- *
- * @param operatorToNode The map from program operators to optimizer nodes.
- * @param defaultExchangeMode The data exchange mode to use, if the operator does not
- * specify one.
- */
- public abstract void setInput(Map<Operator<?>, OptimizerNode> operatorToNode,
- ExecutionMode defaultExchangeMode);
-
- /**
- * This function connects the operators that produce the broadcast inputs to this operator.
- *
- * @param operatorToNode The map from program operators to optimizer nodes.
- * @param defaultExchangeMode The data exchange mode to use, if the operator does not
- * specify one.
- *
- * @throws CompilerException
- */
- public void setBroadcastInputs(Map<Operator<?>, OptimizerNode> operatorToNode, ExecutionMode defaultExchangeMode) {
- // skip for Operators that don't support broadcast variables
- if (!(getOperator() instanceof AbstractUdfOperator<?, ?>)) {
- return;
- }
-
- // get all broadcast inputs
- AbstractUdfOperator<?, ?> operator = ((AbstractUdfOperator<?, ?>) getOperator());
-
- // create connections and add them
- for (Map.Entry<String, Operator<?>> input : operator.getBroadcastInputs().entrySet()) {
- OptimizerNode predecessor = operatorToNode.get(input.getValue());
- DagConnection connection = new DagConnection(predecessor, this,
- ShipStrategyType.BROADCAST, defaultExchangeMode);
- addBroadcastConnection(input.getKey(), connection);
- predecessor.addOutgoingConnection(connection);
- }
- }
-
- /**
- * Gets all incoming connections of this node.
- * This method needs to be overridden by subclasses to return the children.
- *
- * @return The list of incoming connections.
- */
- public abstract List<DagConnection> getIncomingConnections();
-
- /**
- * Tells the node to compute the interesting properties for its inputs. The interesting properties
- * for the node itself must have been computed before.
- * The node must then see how many of interesting properties it preserves and add its own.
- *
- * @param estimator The {@code CostEstimator} instance to use for plan cost estimation.
- */
- public abstract void computeInterestingPropertiesForInputs(CostEstimator estimator);
-
- /**
- * This method causes the node to compute the description of open branches in its sub-plan. An open branch
- * describes, that a (transitive) child node had multiple outputs, which have not all been re-joined in the
- * sub-plan. This method needs to set the <code>openBranches</code> field to a stack of unclosed branches, the
- * latest one top. A branch is considered closed, if some later node sees all of the branching node's outputs,
- * no matter if there have been more branches to different paths in the meantime.
- */
- public abstract void computeUnclosedBranchStack();
-
-
- protected List<UnclosedBranchDescriptor> computeUnclosedBranchStackForBroadcastInputs(
- List<UnclosedBranchDescriptor> branchesSoFar)
- {
- // handle the data flow branching for the broadcast inputs
- for (DagConnection broadcastInput : getBroadcastConnections()) {
- OptimizerNode bcSource = broadcastInput.getSource();
- addClosedBranches(bcSource.closedBranchingNodes);
-
- List<UnclosedBranchDescriptor> bcBranches = bcSource.getBranchesForParent(broadcastInput);
-
- ArrayList<UnclosedBranchDescriptor> mergedBranches = new ArrayList<UnclosedBranchDescriptor>();
- mergeLists(branchesSoFar, bcBranches, mergedBranches, true);
- branchesSoFar = mergedBranches.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList() : mergedBranches;
- }
-
- return branchesSoFar;
- }
-
- /**
- * Computes the plan alternatives for this node, an implicitly for all nodes that are children of
- * this node. This method must determine for each alternative the global and local properties
- * and the costs. This method may recursively call <code>getAlternatives()</code> on its children
- * to get their plan alternatives, and build its own alternatives on top of those.
- *
- * @param estimator
- * The cost estimator used to estimate the costs of each plan alternative.
- * @return A list containing all plan alternatives.
- */
- public abstract List<PlanNode> getAlternativePlans(CostEstimator estimator);
-
- /**
- * This method implements the visit of a depth-first graph traversing visitor. Implementers must first
- * call the <code>preVisit()</code> method, then hand the visitor to their children, and finally call
- * the <code>postVisit()</code> method.
- *
- * @param visitor
- * The graph traversing visitor.
- * @see org.apache.flink.util.Visitable#accept(org.apache.flink.util.Visitor)
- */
- @Override
- public abstract void accept(Visitor<OptimizerNode> visitor);
-
- public abstract SemanticProperties getSemanticProperties();
-
- // ------------------------------------------------------------------------
- // Getters / Setters
- // ------------------------------------------------------------------------
-
- @Override
- public Iterable<OptimizerNode> getPredecessors() {
- List<OptimizerNode> allPredecessors = new ArrayList<OptimizerNode>();
-
- for (DagConnection dagConnection : getIncomingConnections()) {
- allPredecessors.add(dagConnection.getSource());
- }
-
- for (DagConnection conn : getBroadcastConnections()) {
- allPredecessors.add(conn.getSource());
- }
-
- return allPredecessors;
- }
-
- /**
- * Gets the ID of this node. If the id has not yet been set, this method returns -1;
- *
- * @return This node's id, or -1, if not yet set.
- */
- public int getId() {
- return this.id;
- }
-
- /**
- * Sets the ID of this node.
- *
- * @param id
- * The id for this node.
- */
- public void initId(int id) {
- if (id <= 0) {
- throw new IllegalArgumentException();
- }
-
- if (this.id == -1) {
- this.id = id;
- } else {
- throw new IllegalStateException("Id has already been initialized.");
- }
- }
-
- /**
- * Adds the broadcast connection identified by the given {@code name} to this node.
- *
- * @param broadcastConnection The connection to add.
- */
- public void addBroadcastConnection(String name, DagConnection broadcastConnection) {
- this.broadcastConnectionNames.add(name);
- this.broadcastConnections.add(broadcastConnection);
- }
-
- /**
- * Return the list of names associated with broadcast inputs for this node.
- */
- public List<String> getBroadcastConnectionNames() {
- return this.broadcastConnectionNames;
- }
-
- /**
- * Return the list of inputs associated with broadcast variables for this node.
- */
- public List<DagConnection> getBroadcastConnections() {
- return this.broadcastConnections;
- }
-
- /**
- * Adds a new outgoing connection to this node.
- *
- * @param connection
- * The connection to add.
- */
- public void addOutgoingConnection(DagConnection connection) {
- if (this.outgoingConnections == null) {
- this.outgoingConnections = new ArrayList<DagConnection>();
- } else {
- if (this.outgoingConnections.size() == 64) {
- throw new CompilerException("Cannot currently handle nodes with more than 64 outputs.");
- }
- }
-
- this.outgoingConnections.add(connection);
- }
-
- /**
- * The list of outgoing connections from this node to succeeding tasks.
- *
- * @return The list of outgoing connections.
- */
- public List<DagConnection> getOutgoingConnections() {
- return this.outgoingConnections;
- }
-
- /**
- * Gets the operator represented by this optimizer node.
- *
- * @return This node's operator.
- */
- public Operator<?> getOperator() {
- return this.operator;
- }
-
- /**
- * Gets the parallelism for the operator represented by this optimizer node.
- * The parallelism denotes how many parallel instances of the operator on will be
- * spawned during the execution. If this value is <code>-1</code>, then the system will take
- * the default number of parallel instances.
- *
- * @return The parallelism of the operator.
- */
- public int getParallelism() {
- return this.parallelism;
- }
-
- /**
- * Sets the parallelism for this optimizer node.
- * The parallelism denotes how many parallel instances of the operator will be
- * spawned during the execution. If this value is set to <code>-1</code>, then the system will take
- * the default number of parallel instances.
- *
- * @param parallelism The parallelism to set.
- * @throws IllegalArgumentException If the parallelism is smaller than one and not -1.
- */
- public void setDegreeOfParallelism(int parallelism) {
- if (parallelism < 1 && parallelism != -1) {
- throw new IllegalArgumentException("Degree of parallelism of " + parallelism + " is invalid.");
- }
- this.parallelism = parallelism;
- }
-
- /**
- * Gets the amount of memory that all subtasks of this task have jointly available.
- *
- * @return The total amount of memory across all subtasks.
- */
- public long getMinimalMemoryAcrossAllSubTasks() {
- return this.minimalMemoryPerSubTask == -1 ? -1 : this.minimalMemoryPerSubTask * this.parallelism;
- }
-
- public boolean isOnDynamicPath() {
- return this.onDynamicPath;
- }
-
- public void identifyDynamicPath(int costWeight) {
- boolean anyDynamic = false;
- boolean allDynamic = true;
-
- for (DagConnection conn : getIncomingConnections()) {
- boolean dynamicIn = conn.isOnDynamicPath();
- anyDynamic |= dynamicIn;
- allDynamic &= dynamicIn;
- }
-
- for (DagConnection conn : getBroadcastConnections()) {
- boolean dynamicIn = conn.isOnDynamicPath();
- anyDynamic |= dynamicIn;
- allDynamic &= dynamicIn;
- }
-
- if (anyDynamic) {
- this.onDynamicPath = true;
- this.costWeight = costWeight;
- if (!allDynamic) {
- // this node joins static and dynamic path.
- // mark the connections where the source is not dynamic as cached
- for (DagConnection conn : getIncomingConnections()) {
- if (!conn.getSource().isOnDynamicPath()) {
- conn.setMaterializationMode(conn.getMaterializationMode().makeCached());
- }
- }
-
- // broadcast variables are always cached, because they stay unchanged available in the
- // runtime context of the functions
- }
- }
- }
-
- public int getCostWeight() {
- return this.costWeight;
- }
-
- public int getMaxDepth() {
- int maxDepth = 0;
- for (DagConnection conn : getIncomingConnections()) {
- maxDepth = Math.max(maxDepth, conn.getMaxDepth());
- }
- for (DagConnection conn : getBroadcastConnections()) {
- maxDepth = Math.max(maxDepth, conn.getMaxDepth());
- }
-
- return maxDepth;
- }
-
- /**
- * Gets the properties that are interesting for this node to produce.
- *
- * @return The interesting properties for this node, or null, if not yet computed.
- */
- public InterestingProperties getInterestingProperties() {
- return this.intProps;
- }
-
- @Override
- public long getEstimatedOutputSize() {
- return this.estimatedOutputSize;
- }
-
- @Override
- public long getEstimatedNumRecords() {
- return this.estimatedNumRecords;
- }
-
- public void setEstimatedOutputSize(long estimatedOutputSize) {
- this.estimatedOutputSize = estimatedOutputSize;
- }
-
- public void setEstimatedNumRecords(long estimatedNumRecords) {
- this.estimatedNumRecords = estimatedNumRecords;
- }
-
- @Override
- public float getEstimatedAvgWidthPerOutputRecord() {
- if (this.estimatedOutputSize > 0 && this.estimatedNumRecords > 0) {
- return ((float) this.estimatedOutputSize) / this.estimatedNumRecords;
- } else {
- return -1.0f;
- }
- }
-
- /**
- * Checks whether this node has branching output. A node's output is branched, if it has more
- * than one output connection.
- *
- * @return True, if the node's output branches. False otherwise.
- */
- public boolean isBranching() {
- return getOutgoingConnections() != null && getOutgoingConnections().size() > 1;
- }
-
- public void markAllOutgoingConnectionsAsPipelineBreaking() {
- if (this.outgoingConnections == null) {
- throw new IllegalStateException("The outgoing connections have not yet been initialized.");
- }
- for (DagConnection conn : getOutgoingConnections()) {
- conn.markBreaksPipeline();
- }
- }
-
- // ------------------------------------------------------------------------
- // Miscellaneous
- // ------------------------------------------------------------------------
-
- /**
- * Checks, if all outgoing connections have their interesting properties set from their target nodes.
- *
- * @return True, if on all outgoing connections, the interesting properties are set. False otherwise.
- */
- public boolean haveAllOutputConnectionInterestingProperties() {
- for (DagConnection conn : getOutgoingConnections()) {
- if (conn.getInterestingProperties() == null) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Computes all the interesting properties that are relevant to this node. The interesting
- * properties are a union of the interesting properties on each outgoing connection.
- * However, if two interesting properties on the outgoing connections overlap,
- * the interesting properties will occur only once in this set. For that, this
- * method deduplicates and merges the interesting properties.
- * This method returns copies of the original interesting properties objects and
- * leaves the original objects, contained by the connections, unchanged.
- */
- public void computeUnionOfInterestingPropertiesFromSuccessors() {
- List<DagConnection> conns = getOutgoingConnections();
- if (conns.size() == 0) {
- // no incoming, we have none ourselves
- this.intProps = new InterestingProperties();
- } else {
- this.intProps = conns.get(0).getInterestingProperties().clone();
- for (int i = 1; i < conns.size(); i++) {
- this.intProps.addInterestingProperties(conns.get(i).getInterestingProperties());
- }
- }
- this.intProps.dropTrivials();
- }
-
- public void clearInterestingProperties() {
- this.intProps = null;
- for (DagConnection conn : getIncomingConnections()) {
- conn.clearInterestingProperties();
- }
- for (DagConnection conn : getBroadcastConnections()) {
- conn.clearInterestingProperties();
- }
- }
-
- /**
- * Causes this node to compute its output estimates (such as number of rows, size in bytes)
- * based on the inputs and the compiler hints. The compiler hints are instantiated with conservative
- * default values which are used if no other values are provided. Nodes may access the statistics to
- * determine relevant information.
- *
- * @param statistics
- * The statistics object which may be accessed to get statistical information.
- * The parameter may be null, if no statistics are available.
- */
- public void computeOutputEstimates(DataStatistics statistics) {
- // sanity checking
- for (DagConnection c : getIncomingConnections()) {
- if (c.getSource() == null) {
- throw new CompilerException("Bug: Estimate computation called before inputs have been set.");
- }
- }
-
- // let every operator do its computation
- computeOperatorSpecificDefaultEstimates(statistics);
-
- if (this.estimatedOutputSize < 0) {
- this.estimatedOutputSize = -1;
- }
- if (this.estimatedNumRecords < 0) {
- this.estimatedNumRecords = -1;
- }
-
- // overwrite default estimates with hints, if given
- if (getOperator() == null || getOperator().getCompilerHints() == null) {
- return ;
- }
-
- CompilerHints hints = getOperator().getCompilerHints();
- if (hints.getOutputSize() >= 0) {
- this.estimatedOutputSize = hints.getOutputSize();
- }
-
- if (hints.getOutputCardinality() >= 0) {
- this.estimatedNumRecords = hints.getOutputCardinality();
- }
-
- if (hints.getFilterFactor() >= 0.0f) {
- if (this.estimatedNumRecords >= 0) {
- this.estimatedNumRecords = (long) (this.estimatedNumRecords * hints.getFilterFactor());
-
- if (this.estimatedOutputSize >= 0) {
- this.estimatedOutputSize = (long) (this.estimatedOutputSize * hints.getFilterFactor());
- }
- }
- else if (this instanceof SingleInputNode) {
- OptimizerNode pred = ((SingleInputNode) this).getPredecessorNode();
- if (pred != null && pred.getEstimatedNumRecords() >= 0) {
- this.estimatedNumRecords = (long) (pred.getEstimatedNumRecords() * hints.getFilterFactor());
- }
- }
- }
-
- // use the width to infer the cardinality (given size) and vice versa
- if (hints.getAvgOutputRecordSize() >= 1) {
- // the estimated number of rows based on size
- if (this.estimatedNumRecords == -1 && this.estimatedOutputSize >= 0) {
- this.estimatedNumRecords = (long) (this.estimatedOutputSize / hints.getAvgOutputRecordSize());
- }
- else if (this.estimatedOutputSize == -1 && this.estimatedNumRecords >= 0) {
- this.estimatedOutputSize = (long) (this.estimatedNumRecords * hints.getAvgOutputRecordSize());
- }
- }
- }
-
- protected abstract void computeOperatorSpecificDefaultEstimates(DataStatistics statistics);
-
- // ------------------------------------------------------------------------
- // Reading of stub annotations
- // ------------------------------------------------------------------------
-
- /**
- * Reads all stub annotations, i.e. which fields remain constant, what cardinality bounds the
- * functions have, which fields remain unique.
- */
- protected void readStubAnnotations() {
- readUniqueFieldsAnnotation();
- }
-
- protected void readUniqueFieldsAnnotation() {
- if (this.operator.getCompilerHints() != null) {
- Set<FieldSet> uniqueFieldSets = operator.getCompilerHints().getUniqueFields();
- if (uniqueFieldSets != null) {
- if (this.uniqueFields == null) {
- this.uniqueFields = new HashSet<FieldSet>();
- }
- this.uniqueFields.addAll(uniqueFieldSets);
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // Access of stub annotations
- // ------------------------------------------------------------------------
-
- /**
- * Gets the FieldSets which are unique in the output of the node.
- */
- public Set<FieldSet> getUniqueFields() {
- return this.uniqueFields == null ? Collections.<FieldSet>emptySet() : this.uniqueFields;
- }
-
- // --------------------------------------------------------------------------------------------
- // Pruning
- // --------------------------------------------------------------------------------------------
-
- protected void prunePlanAlternatives(List<PlanNode> plans) {
- if (plans.isEmpty()) {
- throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Too restrictive plan hints.");
- }
- // shortcut for the simple case
- if (plans.size() == 1) {
- return;
- }
-
- // we can only compare plan candidates that made equal choices
- // at the branching points. for each choice at a branching point,
- // we need to keep the cheapest (wrt. interesting properties).
- // if we do not keep candidates for each branch choice, we might not
- // find branch compatible candidates when joining the branches back.
-
- // for pruning, we are quasi AFTER the node, so in the presence of
- // branches, we need form the per-branch-choice groups by the choice
- // they made at the latest un-joined branching node. Note that this is
- // different from the check for branch compatibility of candidates, as
- // this happens on the input sub-plans and hence BEFORE the node (therefore
- // it is relevant to find the latest (partially) joined branch point.
-
- if (this.openBranches == null || this.openBranches.isEmpty()) {
- prunePlanAlternativesWithCommonBranching(plans);
- } else {
- // partition the candidates into groups that made the same sub-plan candidate
- // choice at the latest unclosed branch point
-
- final OptimizerNode[] branchDeterminers = new OptimizerNode[this.openBranches.size()];
-
- for (int i = 0; i < branchDeterminers.length; i++) {
- branchDeterminers[i] = this.openBranches.get(this.openBranches.size() - 1 - i).getBranchingNode();
- }
-
- // this sorter sorts by the candidate choice at the branch point
- Comparator<PlanNode> sorter = new Comparator<PlanNode>() {
-
- @Override
- public int compare(PlanNode o1, PlanNode o2) {
- for (OptimizerNode branchDeterminer : branchDeterminers) {
- PlanNode n1 = o1.getCandidateAtBranchPoint(branchDeterminer);
- PlanNode n2 = o2.getCandidateAtBranchPoint(branchDeterminer);
- int hash1 = System.identityHashCode(n1);
- int hash2 = System.identityHashCode(n2);
-
- if (hash1 != hash2) {
- return hash1 - hash2;
- }
- }
- return 0;
- }
- };
- Collections.sort(plans, sorter);
-
- List<PlanNode> result = new ArrayList<PlanNode>();
- List<PlanNode> turn = new ArrayList<PlanNode>();
-
- final PlanNode[] determinerChoice = new PlanNode[branchDeterminers.length];
-
- while (!plans.isEmpty()) {
- // take one as the determiner
- turn.clear();
- PlanNode determiner = plans.remove(plans.size() - 1);
- turn.add(determiner);
-
- for (int i = 0; i < determinerChoice.length; i++) {
- determinerChoice[i] = determiner.getCandidateAtBranchPoint(branchDeterminers[i]);
- }
-
- // go backwards through the plans and find all that are equal
- boolean stillEqual = true;
- for (int k = plans.size() - 1; k >= 0 && stillEqual; k--) {
- PlanNode toCheck = plans.get(k);
-
- for (int i = 0; i < branchDeterminers.length; i++) {
- PlanNode checkerChoice = toCheck.getCandidateAtBranchPoint(branchDeterminers[i]);
-
- if (checkerChoice != determinerChoice[i]) {
- // not the same anymore
- stillEqual = false;
- break;
- }
- }
-
- if (stillEqual) {
- // the same
- plans.remove(k);
- turn.add(toCheck);
- }
- }
-
- // now that we have only plans with the same branch alternatives, prune!
- if (turn.size() > 1) {
- prunePlanAlternativesWithCommonBranching(turn);
- }
- result.addAll(turn);
- }
-
- // after all turns are complete
- plans.clear();
- plans.addAll(result);
- }
- }
-
- protected void prunePlanAlternativesWithCommonBranching(List<PlanNode> plans) {
- // for each interesting property, which plans are cheapest
- final RequestedGlobalProperties[] gps = this.intProps.getGlobalProperties().toArray(
- new RequestedGlobalProperties[this.intProps.getGlobalProperties().size()]);
- final RequestedLocalProperties[] lps = this.intProps.getLocalProperties().toArray(
- new RequestedLocalProperties[this.intProps.getLocalProperties().size()]);
-
- final PlanNode[][] toKeep = new PlanNode[gps.length][];
- final PlanNode[] cheapestForGlobal = new PlanNode[gps.length];
-
-
- PlanNode cheapest = null; // the overall cheapest plan
-
- // go over all plans from the list
- for (PlanNode candidate : plans) {
- // check if that plan is the overall cheapest
- if (cheapest == null || (cheapest.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0)) {
- cheapest = candidate;
- }
-
- // find the interesting global properties that this plan matches
- for (int i = 0; i < gps.length; i++) {
- if (gps[i].isMetBy(candidate.getGlobalProperties())) {
- // the candidate meets the global property requirements. That means
- // it has a chance that its local properties are re-used (they would be
- // destroyed if global properties need to be established)
-
- if (cheapestForGlobal[i] == null || (cheapestForGlobal[i].getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0)) {
- cheapestForGlobal[i] = candidate;
- }
-
- final PlanNode[] localMatches;
- if (toKeep[i] == null) {
- localMatches = new PlanNode[lps.length];
- toKeep[i] = localMatches;
- } else {
- localMatches = toKeep[i];
- }
-
- for (int k = 0; k < lps.length; k++) {
- if (lps[k].isMetBy(candidate.getLocalProperties())) {
- final PlanNode previous = localMatches[k];
- if (previous == null || previous.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0) {
- // this one is cheaper!
- localMatches[k] = candidate;
- }
- }
- }
- }
- }
- }
-
- // all plans are set now
- plans.clear();
-
- // add the cheapest plan
- if (cheapest != null) {
- plans.add(cheapest);
- cheapest.setPruningMarker(); // remember that that plan is in the set
- }
-
- // add all others, which are optimal for some interesting properties
- for (int i = 0; i < gps.length; i++) {
- if (toKeep[i] != null) {
- final PlanNode[] localMatches = toKeep[i];
- for (final PlanNode n : localMatches) {
- if (n != null && !n.isPruneMarkerSet()) {
- n.setPruningMarker();
- plans.add(n);
- }
- }
- }
- if (cheapestForGlobal[i] != null) {
- final PlanNode n = cheapestForGlobal[i];
- if (!n.isPruneMarkerSet()) {
- n.setPruningMarker();
- plans.add(n);
- }
- }
- }
- }
-
-
- // --------------------------------------------------------------------------------------------
- // Handling of branches
- // --------------------------------------------------------------------------------------------
-
- public boolean hasUnclosedBranches() {
- return this.openBranches != null && !this.openBranches.isEmpty();
- }
-
- public Set<OptimizerNode> getClosedBranchingNodes() {
- return this.closedBranchingNodes;
- }
-
- public List<UnclosedBranchDescriptor> getOpenBranches() {
- return this.openBranches;
- }
-
-
- protected List<UnclosedBranchDescriptor> getBranchesForParent(DagConnection toParent) {
- if (this.outgoingConnections.size() == 1) {
- // return our own stack of open branches, because nothing is added
- if (this.openBranches == null || this.openBranches.isEmpty()) {
- return Collections.emptyList();
- } else {
- return new ArrayList<UnclosedBranchDescriptor>(this.openBranches);
- }
- }
- else if (this.outgoingConnections.size() > 1) {
- // we branch add a branch info to the stack
- List<UnclosedBranchDescriptor> branches = new ArrayList<UnclosedBranchDescriptor>(4);
- if (this.openBranches != null) {
- branches.addAll(this.openBranches);
- }
-
- // find out, which output number the connection to the parent
- int num;
- for (num = 0; num < this.outgoingConnections.size(); num++) {
- if (this.outgoingConnections.get(num) == toParent) {
- break;
- }
- }
- if (num >= this.outgoingConnections.size()) {
- throw new CompilerException("Error in compiler: "
- + "Parent to get branch info for is not contained in the outgoing connections.");
- }
-
- // create the description and add it
- long bitvector = 0x1L << num;
- branches.add(new UnclosedBranchDescriptor(this, bitvector));
- return branches;
- }
- else {
- throw new CompilerException(
- "Error in compiler: Cannot get branch info for successor in a node with no successors.");
- }
- }
-
-
- protected void removeClosedBranches(List<UnclosedBranchDescriptor> openList) {
- if (openList == null || openList.isEmpty() || this.closedBranchingNodes == null || this.closedBranchingNodes.isEmpty()) {
- return;
- }
-
- Iterator<UnclosedBranchDescriptor> it = openList.iterator();
- while (it.hasNext()) {
- if (this.closedBranchingNodes.contains(it.next().getBranchingNode())) {
- //this branch was already closed --> remove it from the list
- it.remove();
- }
- }
- }
-
- protected void addClosedBranches(Set<OptimizerNode> alreadyClosed) {
- if (alreadyClosed == null || alreadyClosed.isEmpty()) {
- return;
- }
-
- if (this.closedBranchingNodes == null) {
- this.closedBranchingNodes = new HashSet<OptimizerNode>(alreadyClosed);
- } else {
- this.closedBranchingNodes.addAll(alreadyClosed);
- }
- }
-
- protected void addClosedBranch(OptimizerNode alreadyClosed) {
- if (this.closedBranchingNodes == null) {
- this.closedBranchingNodes = new HashSet<OptimizerNode>();
- }
-
- this.closedBranchingNodes.add(alreadyClosed);
- }
-
- /**
- * Checks whether to candidate plans for the sub-plan of this node are comparable. The two
- * alternative plans are comparable, if
- *
- * a) There is no branch in the sub-plan of this node
- * b) Both candidates have the same candidate as the child at the last open branch.
- *
- * @param plan1 The root node of the first candidate plan.
- * @param plan2 The root node of the second candidate plan.
- * @return True if the nodes are branch compatible in the inputs.
- */
- protected boolean areBranchCompatible(PlanNode plan1, PlanNode plan2) {
- if (plan1 == null || plan2 == null) {
- throw new NullPointerException();
- }
-
- // if there is no open branch, the children are always compatible.
- // in most plans, that will be the dominant case
- if (this.hereJoinedBranches == null || this.hereJoinedBranches.isEmpty()) {
- return true;
- }
-
- for (OptimizerNode joinedBrancher : hereJoinedBranches) {
- final PlanNode branch1Cand = plan1.getCandidateAtBranchPoint(joinedBrancher);
- final PlanNode branch2Cand = plan2.getCandidateAtBranchPoint(joinedBrancher);
-
- if (branch1Cand != null && branch2Cand != null && branch1Cand != branch2Cand) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * The node IDs are assigned in graph-traversal order (pre-order), hence, each list is
- * sorted by ID in ascending order and all consecutive lists start with IDs in ascending order.
- *
- * @param markJoinedBranchesAsPipelineBreaking True, if the
- */
- protected final boolean mergeLists(List<UnclosedBranchDescriptor> child1open,
- List<UnclosedBranchDescriptor> child2open,
- List<UnclosedBranchDescriptor> result,
- boolean markJoinedBranchesAsPipelineBreaking) {
-
- //remove branches which have already been closed
- removeClosedBranches(child1open);
- removeClosedBranches(child2open);
-
- result.clear();
-
- // check how many open branches we have. the cases:
- // 1) if both are null or empty, the result is null
- // 2) if one side is null (or empty), the result is the other side.
- // 3) both are set, then we need to merge.
- if (child1open == null || child1open.isEmpty()) {
- if(child2open != null && !child2open.isEmpty()) {
- result.addAll(child2open);
- }
- return false;
- }
-
- if (child2open == null || child2open.isEmpty()) {
- result.addAll(child1open);
- return false;
- }
-
- int index1 = child1open.size() - 1;
- int index2 = child2open.size() - 1;
-
- boolean didCloseABranch = false;
-
- // as both lists (child1open and child2open) are sorted in ascending ID order
- // we can do a merge-join-like loop which preserved the order in the result list
- // and eliminates duplicates
- while (index1 >= 0 || index2 >= 0) {
- int id1 = -1;
- int id2 = index2 >= 0 ? child2open.get(index2).getBranchingNode().getId() : -1;
-
- while (index1 >= 0 && (id1 = child1open.get(index1).getBranchingNode().getId()) > id2) {
- result.add(child1open.get(index1));
- index1--;
- }
- while (index2 >= 0 && (id2 = child2open.get(index2).getBranchingNode().getId()) > id1) {
- result.add(child2open.get(index2));
- index2--;
- }
-
- // match: they share a common branching child
- if (id1 == id2) {
- didCloseABranch = true;
-
- // if this is the latest common child, remember it
- OptimizerNode currBanchingNode = child1open.get(index1).getBranchingNode();
-
- long vector1 = child1open.get(index1).getJoinedPathsVector();
- long vector2 = child2open.get(index2).getJoinedPathsVector();
-
- // check if this is the same descriptor, (meaning that it contains the same paths)
- // if it is the same, add it only once, otherwise process the join of the paths
- if (vector1 == vector2) {
- result.add(child1open.get(index1));
- }
- else {
- // we merge (re-join) a branch
-
- // mark the branch as a point where we break the pipeline
- if (markJoinedBranchesAsPipelineBreaking) {
- currBanchingNode.markAllOutgoingConnectionsAsPipelineBreaking();
- }
-
- if (this.hereJoinedBranches == null) {
- this.hereJoinedBranches = new ArrayList<OptimizerNode>(2);
- }
- this.hereJoinedBranches.add(currBanchingNode);
-
- // see, if this node closes the branch
- long joinedInputs = vector1 | vector2;
-
- // this is 2^size - 1, which is all bits set at positions 0..size-1
- long allInputs = (0x1L << currBanchingNode.getOutgoingConnections().size()) - 1;
-
- if (joinedInputs == allInputs) {
- // closed - we can remove it from the stack
- addClosedBranch(currBanchingNode);
- } else {
- // not quite closed
- result.add(new UnclosedBranchDescriptor(currBanchingNode, joinedInputs));
- }
- }
-
- index1--;
- index2--;
- }
- }
-
- // merged. now we need to reverse the list, because we added the elements in reverse order
- Collections.reverse(result);
- return didCloseABranch;
- }
-
- @Override
- public OptimizerNode getOptimizerNode() {
- return this;
- }
-
- @Override
- public PlanNode getPlanNode() {
- return null;
- }
-
- @Override
- public Iterable<DumpableConnection<OptimizerNode>> getDumpableInputs() {
- List<DumpableConnection<OptimizerNode>> allInputs = new ArrayList<DumpableConnection<OptimizerNode>>();
-
- allInputs.addAll(getIncomingConnections());
- allInputs.addAll(getBroadcastConnections());
-
- return allInputs;
- }
-
- @Override
- public String toString() {
- StringBuilder bld = new StringBuilder();
-
- bld.append(getName());
- bld.append(" (").append(getOperator().getName()).append(") ");
-
- int i = 1;
- for (DagConnection conn : getIncomingConnections()) {
- String shipStrategyName = conn.getShipStrategy() == null ? "null" : conn.getShipStrategy().name();
- bld.append('(').append(i++).append(":").append(shipStrategyName).append(')');
- }
-
- return bld.toString();
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Description of an unclosed branch. An unclosed branch is when the data flow branched (one operator's
- * result is consumed by multiple targets), but these different branches (targets) have not been joined
- * together.
- */
- public static final class UnclosedBranchDescriptor {
-
- protected OptimizerNode branchingNode;
-
- protected long joinedPathsVector;
-
- /**
- * Creates a new branching descriptor.
- *
- * @param branchingNode The node where the branch occurred (teh node with multiple outputs).
- * @param joinedPathsVector A bit vector describing which branches are tracked by this descriptor.
- * The bit vector is one, where the branch is tracked, zero otherwise.
- */
- protected UnclosedBranchDescriptor(OptimizerNode branchingNode, long joinedPathsVector) {
- this.branchingNode = branchingNode;
- this.joinedPathsVector = joinedPathsVector;
- }
-
- public OptimizerNode getBranchingNode() {
- return this.branchingNode;
- }
-
- public long getJoinedPathsVector() {
- return this.joinedPathsVector;
- }
-
- @Override
- public String toString() {
- return "(" + this.branchingNode.getOperator() + ") [" + this.joinedPathsVector + "]";
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
deleted file mode 100644
index 5c811b0..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- * The optimizer's internal representation of a <i>Partition</i> operator node.
- */
-public class PartitionNode extends SingleInputNode {
-
- private final List<OperatorDescriptorSingle> possibleProperties;
-
- public PartitionNode(PartitionOperatorBase<?> operator) {
- super(operator);
-
- OperatorDescriptorSingle descr = new PartitionDescriptor(
- this.getOperator().getPartitionMethod(), this.keys, operator.getCustomPartitioner());
- this.possibleProperties = Collections.singletonList(descr);
- }
-
- @Override
- public PartitionOperatorBase<?> getOperator() {
- return (PartitionOperatorBase<?>) super.getOperator();
- }
-
- @Override
- public String getName() {
- return "Partition";
- }
-
- @Override
- protected List<OperatorDescriptorSingle> getPossibleProperties() {
- return this.possibleProperties;
- }
-
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- // partitioning does not change the number of records
- this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
- this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
- }
-
- @Override
- public SemanticProperties getSemanticProperties() {
- return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
- }
-
- // --------------------------------------------------------------------------------------------
-
- public static class PartitionDescriptor extends OperatorDescriptorSingle {
-
- private final PartitionMethod pMethod;
- private final Partitioner<?> customPartitioner;
-
- public PartitionDescriptor(PartitionMethod pMethod, FieldSet pKeys, Partitioner<?> customPartitioner) {
- super(pKeys);
-
- this.pMethod = pMethod;
- this.customPartitioner = customPartitioner;
- }
-
- @Override
- public DriverStrategy getStrategy() {
- return DriverStrategy.UNARY_NO_OP;
- }
-
- @Override
- public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "Partition", in, DriverStrategy.UNARY_NO_OP);
- }
-
- @Override
- protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
- RequestedGlobalProperties rgps = new RequestedGlobalProperties();
-
- switch (this.pMethod) {
- case HASH:
- rgps.setHashPartitioned(this.keys);
- break;
- case REBALANCE:
- rgps.setForceRebalancing();
- break;
- case CUSTOM:
- rgps.setCustomPartitioned(this.keys, this.customPartitioner);
- break;
- case RANGE:
- throw new UnsupportedOperationException("Not yet supported");
- default:
- throw new IllegalArgumentException("Invalid partition method");
- }
-
- return Collections.singletonList(rgps);
- }
-
- @Override
- protected List<RequestedLocalProperties> createPossibleLocalProperties() {
- // partitioning does not require any local property.
- return Collections.singletonList(new RequestedLocalProperties());
- }
-
- @Override
- public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
- // the partition node is a no-operation operation, such that all global properties are preserved.
- return gProps;
- }
-
- @Override
- public LocalProperties computeLocalProperties(LocalProperties lProps) {
- // the partition node is a no-operation operation, such that all global properties are preserved.
- return lProps;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java
deleted file mode 100644
index bbe4607..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.util.Visitor;
-
-final class PlanCacheCleaner implements Visitor<OptimizerNode> {
-
- static final PlanCacheCleaner INSTANCE = new PlanCacheCleaner();
-
- @Override
- public boolean preVisit(OptimizerNode visitable) {
- if (visitable.cachedPlans != null && visitable.isOnDynamicPath()) {
- visitable.cachedPlans = null;
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public void postVisit(OptimizerNode visitable) {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
deleted file mode 100644
index 1477038..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.AllReduceProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-import org.apache.flink.optimizer.operators.ReduceProperties;
-
-/**
- * The Optimizer representation of a <i>Reduce</i> operator.
- */
-public class ReduceNode extends SingleInputNode {
-
- private final List<OperatorDescriptorSingle> possibleProperties;
-
- private ReduceNode preReduceUtilityNode;
-
-
- public ReduceNode(ReduceOperatorBase<?, ?> operator) {
- super(operator);
-
- if (this.keys == null) {
- // case of a key-less reducer. force a parallelism of 1
- setDegreeOfParallelism(1);
- }
-
- OperatorDescriptorSingle props = this.keys == null ?
- new AllReduceProperties() :
- new ReduceProperties(this.keys, operator.getCustomPartitioner());
-
- this.possibleProperties = Collections.singletonList(props);
- }
-
- public ReduceNode(ReduceNode reducerToCopyForCombiner) {
- super(reducerToCopyForCombiner);
-
- this.possibleProperties = Collections.emptyList();
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public ReduceOperatorBase<?, ?> getOperator() {
- return (ReduceOperatorBase<?, ?>) super.getOperator();
- }
-
- @Override
- public String getName() {
- return "Reduce";
- }
-
- @Override
- protected List<OperatorDescriptorSingle> getPossibleProperties() {
- return this.possibleProperties;
- }
-
- // --------------------------------------------------------------------------------------------
- // Estimates
- // --------------------------------------------------------------------------------------------
-
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- // no real estimates possible for a reducer.
- }
-
- public ReduceNode getCombinerUtilityNode() {
- if (this.preReduceUtilityNode == null) {
- this.preReduceUtilityNode = new ReduceNode(this);
-
- // we conservatively assume the combiner returns the same data size as it consumes
- this.preReduceUtilityNode.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
- this.preReduceUtilityNode.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
- }
- return this.preReduceUtilityNode;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
deleted file mode 100644
index cc12bb8..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
+++ /dev/null
@@ -1,518 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.NamedChannel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport;
-import org.apache.flink.optimizer.util.NoOpUnaryUdfOp;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Visitor;
-
-import com.google.common.collect.Sets;
-
-/**
- * A node in the optimizer's program representation for an operation with a single input.
- *
- * This class contains all the generic logic for handling branching flows, as well as to
- * enumerate candidate execution plans. The subclasses for specific operators simply add logic
- * for cost estimates and specify possible strategies for their execution.
- */
-public abstract class SingleInputNode extends OptimizerNode {
-
- protected final FieldSet keys; // The set of key fields
-
- protected DagConnection inConn; // the input of the node
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a new node with a single input for the optimizer plan.
- *
- * @param programOperator The PACT that the node represents.
- */
- protected SingleInputNode(SingleInputOperator<?, ?, ?> programOperator) {
- super(programOperator);
-
- int[] k = programOperator.getKeyColumns(0);
- this.keys = k == null || k.length == 0 ? null : new FieldSet(k);
- }
-
- protected SingleInputNode(FieldSet keys) {
- super(NoOpUnaryUdfOp.INSTANCE);
- this.keys = keys;
- }
-
- protected SingleInputNode() {
- super(NoOpUnaryUdfOp.INSTANCE);
- this.keys = null;
- }
-
- protected SingleInputNode(SingleInputNode toCopy) {
- super(toCopy);
-
- this.keys = toCopy.keys;
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public SingleInputOperator<?, ?, ?> getOperator() {
- return (SingleInputOperator<?, ?, ?>) super.getOperator();
- }
-
- /**
- * Gets the input of this operator.
- *
- * @return The input.
- */
- public DagConnection getIncomingConnection() {
- return this.inConn;
- }
-
- /**
- * Sets the connection through which this node receives its input.
- *
- * @param inConn The input connection to set.
- */
- public void setIncomingConnection(DagConnection inConn) {
- this.inConn = inConn;
- }
-
- /**
- * Gets the predecessor of this node.
- *
- * @return The predecessor of this node.
- */
- public OptimizerNode getPredecessorNode() {
- if (this.inConn != null) {
- return this.inConn.getSource();
- } else {
- return null;
- }
- }
-
- @Override
- public List<DagConnection> getIncomingConnections() {
- return Collections.singletonList(this.inConn);
- }
-
-
- @Override
- public SemanticProperties getSemanticProperties() {
- return getOperator().getSemanticProperties();
- }
-
-
- @Override
- public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode)
- throws CompilerException
- {
- // see if an internal hint dictates the strategy to use
- final Configuration conf = getOperator().getParameters();
- final String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
- final ShipStrategyType preSet;
-
- if (shipStrategy != null) {
- if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH)) {
- preSet = ShipStrategyType.PARTITION_HASH;
- } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) {
- preSet = ShipStrategyType.PARTITION_RANGE;
- } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_FORWARD)) {
- preSet = ShipStrategyType.FORWARD;
- } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
- preSet = ShipStrategyType.PARTITION_RANDOM;
- } else {
- throw new CompilerException("Unrecognized ship strategy hint: " + shipStrategy);
- }
- } else {
- preSet = null;
- }
-
- // get the predecessor node
- Operator<?> children = ((SingleInputOperator<?, ?, ?>) getOperator()).getInput();
-
- OptimizerNode pred;
- DagConnection conn;
- if (children == null) {
- throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input.");
- } else {
- pred = contractToNode.get(children);
- conn = new DagConnection(pred, this, defaultExchangeMode);
- if (preSet != null) {
- conn.setShipStrategy(preSet);
- }
- }
-
- // create the connection and add it
- setIncomingConnection(conn);
- pred.addOutgoingConnection(conn);
- }
-
- // --------------------------------------------------------------------------------------------
- // Properties and Optimization
- // --------------------------------------------------------------------------------------------
-
- protected abstract List<OperatorDescriptorSingle> getPossibleProperties();
-
- @Override
- public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
- // get what we inherit and what is preserved by our user code
- final InterestingProperties props = getInterestingProperties().filterByCodeAnnotations(this, 0);
-
- // add all properties relevant to this node
- for (OperatorDescriptorSingle dps : getPossibleProperties()) {
- for (RequestedGlobalProperties gp : dps.getPossibleGlobalProperties()) {
-
- if (gp.getPartitioning().isPartitionedOnKey()) {
- // make sure that among the same partitioning types, we do not push anything down that has fewer key fields
-
- for (RequestedGlobalProperties contained : props.getGlobalProperties()) {
- if (contained.getPartitioning() == gp.getPartitioning() && gp.getPartitionedFields().isValidSubset(contained.getPartitionedFields())) {
- props.getGlobalProperties().remove(contained);
- break;
- }
- }
- }
-
- props.addGlobalProperties(gp);
- }
-
- for (RequestedLocalProperties lp : dps.getPossibleLocalProperties()) {
- props.addLocalProperties(lp);
- }
- }
- this.inConn.setInterestingProperties(props);
-
- for (DagConnection conn : getBroadcastConnections()) {
- conn.setInterestingProperties(new InterestingProperties());
- }
- }
-
-
- @Override
- public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
- // check if we have a cached version
- if (this.cachedPlans != null) {
- return this.cachedPlans;
- }
-
- boolean childrenSkippedDueToReplicatedInput = false;
-
- // calculate alternative sub-plans for predecessor
- final List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
- final Set<RequestedGlobalProperties> intGlobal = this.inConn.getInterestingProperties().getGlobalProperties();
-
- // calculate alternative sub-plans for broadcast inputs
- final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>();
- List<DagConnection> broadcastConnections = getBroadcastConnections();
- List<String> broadcastConnectionNames = getBroadcastConnectionNames();
-
- for (int i = 0; i < broadcastConnections.size(); i++ ) {
- DagConnection broadcastConnection = broadcastConnections.get(i);
- String broadcastConnectionName = broadcastConnectionNames.get(i);
- List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator);
-
- // wrap the plan candidates in named channels
- HashSet<NamedChannel> broadcastChannels = new HashSet<NamedChannel>(broadcastPlanCandidates.size());
- for (PlanNode plan: broadcastPlanCandidates) {
- NamedChannel c = new NamedChannel(broadcastConnectionName, plan);
- DataExchangeMode exMode = DataExchangeMode.select(broadcastConnection.getDataExchangeMode(),
- ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline());
- c.setShipStrategy(ShipStrategyType.BROADCAST, exMode);
- broadcastChannels.add(c);
- }
- broadcastPlanChannels.add(broadcastChannels);
- }
-
- final RequestedGlobalProperties[] allValidGlobals;
- {
- Set<RequestedGlobalProperties> pairs = new HashSet<RequestedGlobalProperties>();
- for (OperatorDescriptorSingle ods : getPossibleProperties()) {
- pairs.addAll(ods.getPossibleGlobalProperties());
- }
- allValidGlobals = pairs.toArray(new RequestedGlobalProperties[pairs.size()]);
- }
- final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();
-
- final ExecutionMode executionMode = this.inConn.getDataExchangeMode();
-
- final int dop = getParallelism();
- final int inDop = getPredecessorNode().getParallelism();
- final boolean dopChange = inDop != dop;
-
- final boolean breaksPipeline = this.inConn.isBreakingPipeline();
-
- // create all candidates
- for (PlanNode child : subPlans) {
-
- if (child.getGlobalProperties().isFullyReplicated()) {
- // fully replicated input is always locally forwarded if DOP is not changed
- if (dopChange) {
- // can not continue with this child
- childrenSkippedDueToReplicatedInput = true;
- continue;
- } else {
- this.inConn.setShipStrategy(ShipStrategyType.FORWARD);
- }
- }
-
- if (this.inConn.getShipStrategy() == null) {
- // pick the strategy ourselves
- for (RequestedGlobalProperties igps: intGlobal) {
- final Channel c = new Channel(child, this.inConn.getMaterializationMode());
- igps.parameterizeChannel(c, dopChange, executionMode, breaksPipeline);
-
- // if the DOP changed, make sure that we cancel out properties, unless the
- // ship strategy preserves/establishes them even under changing DOPs
- if (dopChange && !c.getShipStrategy().isNetworkStrategy()) {
- c.getGlobalProperties().reset();
- }
-
- // check whether we meet any of the accepted properties
- // we may remove this check, when we do a check to not inherit
- // requested global properties that are incompatible with all possible
- // requested properties
- for (RequestedGlobalProperties rgps: allValidGlobals) {
- if (rgps.isMetBy(c.getGlobalProperties())) {
- c.setRequiredGlobalProps(rgps);
- addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator);
- break;
- }
- }
- }
- } else {
- // hint fixed the strategy
- final Channel c = new Channel(child, this.inConn.getMaterializationMode());
- final ShipStrategyType shipStrategy = this.inConn.getShipStrategy();
- final DataExchangeMode exMode = DataExchangeMode.select(executionMode, shipStrategy, breaksPipeline);
-
- if (this.keys != null) {
- c.setShipStrategy(shipStrategy, this.keys.toFieldList(), exMode);
- } else {
- c.setShipStrategy(shipStrategy, exMode);
- }
-
- if (dopChange) {
- c.adjustGlobalPropertiesForFullParallelismChange();
- }
-
- // check whether we meet any of the accepted properties
- for (RequestedGlobalProperties rgps: allValidGlobals) {
- if (rgps.isMetBy(c.getGlobalProperties())) {
- addLocalCandidates(c, broadcastPlanChannels, rgps, outputPlans, estimator);
- break;
- }
- }
- }
- }
-
- if(outputPlans.isEmpty()) {
- if(childrenSkippedDueToReplicatedInput) {
- throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Invalid use of replicated input.");
- } else {
- throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Too restrictive plan hints.");
- }
- }
-
- // cost and prune the plans
- for (PlanNode node : outputPlans) {
- estimator.costOperator(node);
- }
- prunePlanAlternatives(outputPlans);
- outputPlans.trimToSize();
-
- this.cachedPlans = outputPlans;
- return outputPlans;
- }
-
- protected void addLocalCandidates(Channel template, List<Set<? extends NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps,
- List<PlanNode> target, CostEstimator estimator)
- {
- for (RequestedLocalProperties ilp : this.inConn.getInterestingProperties().getLocalProperties()) {
- final Channel in = template.clone();
- ilp.parameterizeChannel(in);
-
- // instantiate a candidate, if the instantiated local properties meet one possible local property set
- outer:
- for (OperatorDescriptorSingle dps: getPossibleProperties()) {
- for (RequestedLocalProperties ilps : dps.getPossibleLocalProperties()) {
- if (ilps.isMetBy(in.getLocalProperties())) {
- in.setRequiredLocalProps(ilps);
- instantiateCandidate(dps, in, broadcastPlanChannels, target, estimator, rgps, ilp);
- break outer;
- }
- }
- }
- }
- }
-
- protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels,
- List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
- {
- final PlanNode inputSource = in.getSource();
-
- for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
-
- boolean validCombination = true;
- boolean requiresPipelinebreaker = false;
-
- // check whether the broadcast inputs use the same plan candidate at the branching point
- for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
- NamedChannel nc = broadcastChannelsCombination.get(i);
- PlanNode bcSource = nc.getSource();
-
- // check branch compatibility against input
- if (!areBranchCompatible(bcSource, inputSource)) {
- validCombination = false;
- break;
- }
-
- // check branch compatibility against all other broadcast variables
- for (int k = 0; k < i; k++) {
- PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
-
- if (!areBranchCompatible(bcSource, otherBcSource)) {
- validCombination = false;
- break;
- }
- }
-
- // check if there is a common predecessor and whether there is a dam on the way to all common predecessors
- if (this.hereJoinedBranches != null) {
- for (OptimizerNode brancher : this.hereJoinedBranches) {
- PlanNode candAtBrancher = in.getSource().getCandidateAtBranchPoint(brancher);
-
- if (candAtBrancher == null) {
- // closed branch between two broadcast variables
- continue;
- }
-
- SourceAndDamReport res = in.getSource().hasDamOnPathDownTo(candAtBrancher);
- if (res == NOT_FOUND) {
- throw new CompilerException("Bug: Tracing dams for deadlock detection is broken.");
- } else if (res == FOUND_SOURCE) {
- requiresPipelinebreaker = true;
- break;
- } else if (res == FOUND_SOURCE_AND_DAM) {
- // good
- } else {
- throw new CompilerException();
- }
- }
- }
- }
-
- if (!validCombination) {
- continue;
- }
-
- if (requiresPipelinebreaker) {
- in.setTempMode(in.getTempMode().makePipelineBreaker());
- }
-
- final SingleInputPlanNode node = dps.instantiate(in, this);
- node.setBroadcastInputs(broadcastChannelsCombination);
-
- // compute how the strategy affects the properties
- GlobalProperties gProps = in.getGlobalProperties().clone();
- LocalProperties lProps = in.getLocalProperties().clone();
- gProps = dps.computeGlobalProperties(gProps);
- lProps = dps.computeLocalProperties(lProps);
-
- SemanticProperties props = this.getSemanticProperties();
- // filter by the user code field copies
- gProps = gProps.filterBySemanticProperties(props, 0);
- lProps = lProps.filterBySemanticProperties(props, 0);
-
- // apply
- node.initProperties(gProps, lProps);
- node.updatePropertiesWithUniqueSets(getUniqueFields());
- target.add(node);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Branch Handling
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void computeUnclosedBranchStack() {
- if (this.openBranches != null) {
- return;
- }
-
- addClosedBranches(getPredecessorNode().closedBranchingNodes);
- List<UnclosedBranchDescriptor> fromInput = getPredecessorNode().getBranchesForParent(this.inConn);
-
- // handle the data flow branching for the broadcast inputs
- List<UnclosedBranchDescriptor> result = computeUnclosedBranchStackForBroadcastInputs(fromInput);
-
- this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
- }
-
- // --------------------------------------------------------------------------------------------
- // Miscellaneous
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void accept(Visitor<OptimizerNode> visitor) {
- if (visitor.preVisit(this)) {
- if (getPredecessorNode() != null) {
- getPredecessorNode().accept(visitor);
- } else {
- throw new CompilerException();
- }
- for (DagConnection connection : getBroadcastConnections()) {
- connection.getSource().accept(visitor);
- }
- visitor.postVisit(this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
deleted file mode 100644
index 40725ba..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.UtilSinkJoinOpDescriptor;
-import org.apache.flink.optimizer.util.NoOpBinaryUdfOp;
-import org.apache.flink.types.Nothing;
-
-/**
- * This class represents a utility node that is not part of the actual plan.
- * It is used for plans with multiple data sinks to transform it into a plan with
- * a single root node. That way, the code that makes sure no costs are double-counted and that
- * candidate selection works correctly with nodes that have multiple outputs is transparently reused.
- */
-public class SinkJoiner extends TwoInputNode {
-
- public SinkJoiner(OptimizerNode input1, OptimizerNode input2) {
- super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo()));
-
- DagConnection conn1 = new DagConnection(input1, this, null, ExecutionMode.PIPELINED);
- DagConnection conn2 = new DagConnection(input2, this, null, ExecutionMode.PIPELINED);
-
- this.input1 = conn1;
- this.input2 = conn2;
-
- setDegreeOfParallelism(1);
- }
-
- @Override
- public String getName() {
- return "Internal Utility Node";
- }
-
- @Override
- public List<DagConnection> getOutgoingConnections() {
- return Collections.emptyList();
- }
-
- @Override
- public void computeUnclosedBranchStack() {
- if (this.openBranches != null) {
- return;
- }
-
- addClosedBranches(getFirstPredecessorNode().closedBranchingNodes);
- addClosedBranches(getSecondPredecessorNode().closedBranchingNodes);
-
- List<UnclosedBranchDescriptor> pred1branches = getFirstPredecessorNode().openBranches;
- List<UnclosedBranchDescriptor> pred2branches = getSecondPredecessorNode().openBranches;
-
- // if the predecessors do not have branches, then we have multiple sinks that do not originate from
- // a common data flow.
- if (pred1branches == null || pred1branches.isEmpty()) {
-
- this.openBranches = (pred2branches == null || pred2branches.isEmpty()) ?
- Collections.<UnclosedBranchDescriptor>emptyList() : // both empty - disconnected flow
- pred2branches;
- }
- else if (pred2branches == null || pred2branches.isEmpty()) {
- this.openBranches = pred1branches;
- }
- else {
- // copy the lists and merge
- List<UnclosedBranchDescriptor> result1 = new ArrayList<UnclosedBranchDescriptor>(pred1branches);
- List<UnclosedBranchDescriptor> result2 = new ArrayList<UnclosedBranchDescriptor>(pred2branches);
-
- ArrayList<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>();
- mergeLists(result1, result2, result, false);
-
- this.openBranches = result.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
- }
- }
-
- @Override
- protected List<OperatorDescriptorDual> getPossibleProperties() {
- return Collections.<OperatorDescriptorDual>singletonList(new UtilSinkJoinOpDescriptor());
- }
-
- @Override
- public void computeOutputEstimates(DataStatistics statistics) {
- // nothing to be done here
- }
-
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- // no estimates needed at this point
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
deleted file mode 100644
index 1292cf5..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-
-/**
- * The optimizer's internal representation of the solution set of a workset iteration.
- */
-public class SolutionSetNode extends AbstractPartialSolutionNode {
-
- private final WorksetIterationNode iterationNode;
-
-
- public SolutionSetNode(SolutionSetPlaceHolder<?> psph, WorksetIterationNode iterationNode) {
- super(psph);
- this.iterationNode = iterationNode;
- }
-
- // --------------------------------------------------------------------------------------------
-
- public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) {
- this.cachedPlans = Collections.<PlanNode>singletonList(new SolutionSetPlanNode(this, "SolutionSet("+this.getOperator().getName()+")", gProps, lProps, initialInput));
- }
-
- public SolutionSetPlanNode getCurrentSolutionSetPlanNode() {
- if (this.cachedPlans != null) {
- return (SolutionSetPlanNode) this.cachedPlans.get(0);
- } else {
- throw new IllegalStateException();
- }
- }
-
- public WorksetIterationNode getIterationNode() {
- return this.iterationNode;
- }
-
- @Override
- public void computeOutputEstimates(DataStatistics statistics) {
- copyEstimates(this.iterationNode.getInitialSolutionSetPredecessorNode());
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Gets the contract object for this data source node.
- *
- * @return The contract.
- */
- @Override
- public SolutionSetPlaceHolder<?> getOperator() {
- return (SolutionSetPlaceHolder<?>) super.getOperator();
- }
-
- @Override
- public String getName() {
- return "Solution Set";
- }
-
- @Override
- public void computeUnclosedBranchStack() {
- if (this.openBranches != null) {
- return;
- }
-
- DagConnection solutionSetInput = this.iterationNode.getFirstIncomingConnection();
- OptimizerNode solutionSetSource = solutionSetInput.getSource();
-
- addClosedBranches(solutionSetSource.closedBranchingNodes);
- List<UnclosedBranchDescriptor> fromInput = solutionSetSource.getBranchesForParent(solutionSetInput);
- this.openBranches = (fromInput == null || fromInput.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
deleted file mode 100644
index 83bc39a..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The optimizer's internal representation of a <i>SortPartition</i> operator node.
- */
-public class SortPartitionNode extends SingleInputNode {
-
- private final List<OperatorDescriptorSingle> possibleProperties;
-
- public SortPartitionNode(SortPartitionOperatorBase<?> operator) {
- super(operator);
-
- OperatorDescriptorSingle descr = new SortPartitionDescriptor(operator.getPartitionOrdering());
- this.possibleProperties = Collections.singletonList(descr);
- }
-
- @Override
- public SortPartitionOperatorBase<?> getOperator() {
- return (SortPartitionOperatorBase<?>) super.getOperator();
- }
-
- @Override
- public String getName() {
- return "Sort-Partition";
- }
-
- @Override
- protected List<OperatorDescriptorSingle> getPossibleProperties() {
- return this.possibleProperties;
- }
-
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- // sorting does not change the number of records
- this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
- this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
- }
-
- @Override
- public SemanticProperties getSemanticProperties() {
- return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
- }
-
- // --------------------------------------------------------------------------------------------
-
- public static class SortPartitionDescriptor extends OperatorDescriptorSingle {
-
- private Ordering partitionOrder;
-
- public SortPartitionDescriptor(Ordering partitionOrder) {
- this.partitionOrder = partitionOrder;
- }
-
- @Override
- public DriverStrategy getStrategy() {
- return DriverStrategy.UNARY_NO_OP;
- }
-
- @Override
- public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "Sort-Partition", in, DriverStrategy.UNARY_NO_OP);
- }
-
- @Override
- protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
- // sort partition does not require any global property
- return Collections.singletonList(new RequestedGlobalProperties());
- }
-
- @Override
- protected List<RequestedLocalProperties> createPossibleLocalProperties() {
- // set partition order as required local property
- RequestedLocalProperties rlp = new RequestedLocalProperties();
- rlp.setOrdering(this.partitionOrder);
-
- return Collections.singletonList(rlp);
- }
-
- @Override
- public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
- // sort partition is a no-operation operation, such that all global properties are preserved.
- return gProps;
- }
-
- @Override
- public LocalProperties computeLocalProperties(LocalProperties lProps) {
- // sort partition is a no-operation operation, such that all global properties are preserved.
- return lProps;
- }
- }
-}