You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:31 UTC
[52/53] [abbrv] flink git commit: [FLINK-441] [optimizer] Rename
Pact* and Nephele* classes
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java
deleted file mode 100644
index d77fe1e..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java
+++ /dev/null
@@ -1,1372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
-import org.apache.flink.optimizer.dag.GroupCombineNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
-import org.apache.flink.optimizer.dag.SortPartitionNode;
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.GenericDataSinkBase;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Union;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.base.CrossOperatorBase;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.common.operators.base.FilterOperatorBase;
-import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.dag.BinaryUnionNode;
-import org.apache.flink.optimizer.dag.BulkIterationNode;
-import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
-import org.apache.flink.optimizer.dag.CoGroupNode;
-import org.apache.flink.optimizer.dag.CollectorMapNode;
-import org.apache.flink.optimizer.dag.CrossNode;
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.optimizer.dag.DataSourceNode;
-import org.apache.flink.optimizer.dag.FilterNode;
-import org.apache.flink.optimizer.dag.FlatMapNode;
-import org.apache.flink.optimizer.dag.GroupReduceNode;
-import org.apache.flink.optimizer.dag.IterationNode;
-import org.apache.flink.optimizer.dag.MapNode;
-import org.apache.flink.optimizer.dag.MapPartitionNode;
-import org.apache.flink.optimizer.dag.JoinNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.PactConnection;
-import org.apache.flink.optimizer.dag.PartitionNode;
-import org.apache.flink.optimizer.dag.ReduceNode;
-import org.apache.flink.optimizer.dag.SinkJoiner;
-import org.apache.flink.optimizer.dag.SolutionSetNode;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.dag.WorksetIterationNode;
-import org.apache.flink.optimizer.dag.WorksetNode;
-import org.apache.flink.optimizer.deadlockdetect.DeadlockPreventer;
-import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.IterationPlanNode;
-import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SinkJoinerPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-import org.apache.flink.optimizer.postpass.OptimizerPostPass;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Visitor;
-
-/**
- * The optimizer that takes the user specified program plan and creates an optimized plan that contains
- * exact descriptions about how the physical execution will take place. It first translates the user
- * program into an internal optimizer representation and then chooses between different alternatives
- * for shipping strategies and local strategies.
- * <p>
- * The basic principle is taken from optimizer works in systems such as Volcano/Cascades and Selinger/System-R/DB2. The
- * optimizer walks from the sinks down, generating interesting properties, and ascends from the sources generating
- * alternative plans, pruning against the interesting properties.
- * <p>
- * The optimizer also assigns the memory to the individual tasks. This is currently done in a very simple fashion: All
- * sub-tasks that need memory (e.g. reduce or join) are given an equal share of memory.
- */
-public class PactCompiler {
-
- // ------------------------------------------------------------------------
- // Constants
- // ------------------------------------------------------------------------
-
- /**
- * Compiler hint key for the input channel's shipping strategy. This String is a key to the operator's stub
- * parameters. The corresponding value tells the compiler which shipping strategy to use for the input channel.
- * If the operator has two input channels, the shipping strategy is applied to both input channels.
- */
- public static final String HINT_SHIP_STRATEGY = "INPUT_SHIP_STRATEGY";
-
- /**
- * Compiler hint key for the <b>first</b> input channel's shipping strategy. This String is a key to
- * the operator's stub parameters. The corresponding value tells the compiler which shipping strategy
- * to use for the <b>first</b> input channel. Only applicable to operators with two inputs.
- */
- public static final String HINT_SHIP_STRATEGY_FIRST_INPUT = "INPUT_LEFT_SHIP_STRATEGY";
-
- /**
- * Compiler hint key for the <b>second</b> input channel's shipping strategy. This String is a key to
- * the operator's stub parameters. The corresponding value tells the compiler which shipping strategy
- * to use for the <b>second</b> input channel. Only applicable to operators with two inputs.
- */
- public static final String HINT_SHIP_STRATEGY_SECOND_INPUT = "INPUT_RIGHT_SHIP_STRATEGY";
-
- /**
- * Value for the shipping strategy compiler hint that enforces a <b>Forward</b> strategy on the
- * input channel, i.e. no redistribution of any kind.
- *
- * @see #HINT_SHIP_STRATEGY
- * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
- * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
- */
- public static final String HINT_SHIP_STRATEGY_FORWARD = "SHIP_FORWARD";
-
- /**
- * Value for the shipping strategy compiler hint that enforces a random repartition strategy.
- *
- * @see #HINT_SHIP_STRATEGY
- * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
- * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
- */
- public static final String HINT_SHIP_STRATEGY_REPARTITION= "SHIP_REPARTITION";
-
- /**
- * Value for the shipping strategy compiler hint that enforces a hash-partition strategy.
- *
- * @see #HINT_SHIP_STRATEGY
- * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
- * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
- */
- public static final String HINT_SHIP_STRATEGY_REPARTITION_HASH = "SHIP_REPARTITION_HASH";
-
- /**
- * Value for the shipping strategy compiler hint that enforces a range-partition strategy.
- *
- * @see #HINT_SHIP_STRATEGY
- * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
- * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
- */
- public static final String HINT_SHIP_STRATEGY_REPARTITION_RANGE = "SHIP_REPARTITION_RANGE";
-
- /**
- * Value for the shipping strategy compiler hint that enforces a <b>broadcast</b> strategy on the
- * input channel.
- *
- * @see #HINT_SHIP_STRATEGY
- * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
- * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
- */
- public static final String HINT_SHIP_STRATEGY_BROADCAST = "SHIP_BROADCAST";
-
- /**
- * Compiler hint key for the operator's local strategy. This String is a key to the operator's stub
- * parameters. The corresponding value tells the compiler which local strategy to use to process the
- * data inside one partition.
- * <p>
- * This hint is ignored by operators that do not have a local strategy (such as <i>Map</i>), or by operators that
- * have no choice in their local strategy (such as <i>Cross</i>).
- */
- public static final String HINT_LOCAL_STRATEGY = "LOCAL_STRATEGY";
-
- /**
- * Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy.
- * For example, a <i>Reduce</i> operator will sort the data to group it.
- *
- * @see #HINT_LOCAL_STRATEGY
- */
- public static final String HINT_LOCAL_STRATEGY_SORT = "LOCAL_STRATEGY_SORT";
-
- /**
- * Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy.
- * During sorting a combine method is repeatedly applied to reduce the data volume.
- * For example, a <i>Reduce</i> operator will sort the data to group it.
- *
- * @see #HINT_LOCAL_STRATEGY
- */
- public static final String HINT_LOCAL_STRATEGY_COMBINING_SORT = "LOCAL_STRATEGY_COMBINING_SORT";
-
- /**
- * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy on both
- * inputs with subsequent merging of inputs.
- * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs
- * of matching keys.
- *
- * @see #HINT_LOCAL_STRATEGY
- */
- public static final String HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE = "LOCAL_STRATEGY_SORT_BOTH_MERGE";
-
- /**
- * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy.
- * The first input is sorted, the second input is assumed to be sorted. After sorting both inputs are merged.
- * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs
- * of matching keys.
- *
- * @see #HINT_LOCAL_STRATEGY
- */
- public static final String HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE = "LOCAL_STRATEGY_SORT_FIRST_MERGE";
-
- /**
- * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy.
- * The second input is sorted, the first input is assumed to be sorted. After sorting both inputs are merged.
- * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs
- * of matching keys.
- *
- * @see #HINT_LOCAL_STRATEGY
- */
- public static final String HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE = "LOCAL_STRATEGY_SORT_SECOND_MERGE";
-
- /**
- * Value for the local strategy compiler hint that enforces a <b>merge based</b> local strategy.
- * Both inputs are assumed to be sorted and are merged.
- * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a merge strategy to find pairs
- * of matching keys.
- *
- * @see #HINT_LOCAL_STRATEGY
- */
- public static final String HINT_LOCAL_STRATEGY_MERGE = "LOCAL_STRATEGY_MERGE";
-
-
- /**
- * Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy.
- * For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of
- * matching keys. The <b>first</b> input will be used to build the hash table, the second input will be
- * used to probe the table.
- *
- * @see #HINT_LOCAL_STRATEGY
- */
- public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST = "LOCAL_STRATEGY_HASH_BUILD_FIRST";
-
- /**
- * Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy.
- * For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of
- * matching keys. The <b>second</b> input will be used to build the hash table, the first input will be
- * used to probe the table.
- *
- * @see #HINT_LOCAL_STRATEGY
- */
- public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND = "LOCAL_STRATEGY_HASH_BUILD_SECOND";
-
- /**
- * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
- * A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops.
- * Hence, the data of the first input will be is streamed though, while the data of the second input is stored on
- * disk
- * and repeatedly read.
- *
- * @see #HINT_LOCAL_STRATEGY
- */
- public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST";
-
- /**
- * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
- * A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops.
- * Hence, the data of the second input will be is streamed though, while the data of the first input is stored on
- * disk
- * and repeatedly read.
- *
- * @see #HINT_LOCAL_STRATEGY
- */
- public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND";
-
- /**
- * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
- * A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops.
- * Further more, the first input, being the outer side, will be processed in blocks, and for each block, the second
- * input,
- * being the inner side, will read repeatedly from disk.
- *
- * @see #HINT_LOCAL_STRATEGY
- */
- public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST";
-
- /**
- * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
- * A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops.
- * Further more, the second input, being the outer side, will be processed in blocks, and for each block, the first
- * input,
- * being the inner side, will read repeatedly from disk.
- *
- * @see #HINT_LOCAL_STRATEGY
- */
- public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND";
-
- /**
- * The log handle that is used by the compiler to log messages.
- */
- public static final Logger LOG = LoggerFactory.getLogger(PactCompiler.class);
-
- // ------------------------------------------------------------------------
- // Members
- // ------------------------------------------------------------------------
-
- /**
- * The statistics object used to obtain statistics, such as input sizes,
- * for the cost estimation process.
- */
- private final DataStatistics statistics;
-
- /**
- * The cost estimator used by the compiler.
- */
- private final CostEstimator costEstimator;
-
- /**
- * The default degree of parallelism for jobs compiled by this compiler.
- */
- private int defaultDegreeOfParallelism;
-
-
- // ------------------------------------------------------------------------
- // Constructor & Setup
- // ------------------------------------------------------------------------
-
- /**
- * Creates a new optimizer instance. The optimizer has no access to statistics about the
- * inputs and can hence not determine any properties. It will perform all optimization with
- * unknown sizes and hence use only the heuristic cost functions, which result in the selection
- * of the most robust execution strategies.
- */
- public PactCompiler() {
- this(null, new DefaultCostEstimator());
- }
-
- /**
- * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
- * Given those statistics, the optimizer can make better choices for the execution strategies.
- *
- * @param stats
- * The statistics to be used to determine the input properties.
- */
- public PactCompiler(DataStatistics stats) {
- this(stats, new DefaultCostEstimator());
- }
-
- /**
- * Creates a new optimizer instance. The optimizer has no access to statistics about the
- * inputs and can hence not determine any properties. It will perform all optimization with
- * unknown sizes and hence use only the heuristic cost functions, which result in the selection
- * of the most robust execution strategies.
- *
- * The optimizer uses the given cost estimator to compute the costs of the individual operations.
- *
- * @param estimator The cost estimator to use to cost the individual operations.
- */
- public PactCompiler(CostEstimator estimator) {
- this(null, estimator);
- }
-
- /**
- * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
- * Given those statistics, the optimizer can make better choices for the execution strategies.
- *
- * The optimizer uses the given cost estimator to compute the costs of the individual operations.
- *
- * @param stats
- * The statistics to be used to determine the input properties.
- * @param estimator
- * The <tt>CostEstimator</tt> to use to cost the individual operations.
- */
- public PactCompiler(DataStatistics stats, CostEstimator estimator) {
- this.statistics = stats;
- this.costEstimator = estimator;
-
- // determine the default parallelism
- this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(
- ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
- ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
-
- if (defaultDegreeOfParallelism < 1) {
- LOG.warn("Config value " + defaultDegreeOfParallelism + " for option "
- + ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + " is invalid. Ignoring and using a value of 1.");
- this.defaultDegreeOfParallelism = 1;
- }
- }
-
- // ------------------------------------------------------------------------
- // Getters / Setters
- // ------------------------------------------------------------------------
-
- public int getDefaultDegreeOfParallelism() {
- return defaultDegreeOfParallelism;
- }
-
- public void setDefaultDegreeOfParallelism(int defaultDegreeOfParallelism) {
- if (defaultDegreeOfParallelism > 0) {
- this.defaultDegreeOfParallelism = defaultDegreeOfParallelism;
- } else {
- throw new IllegalArgumentException("Default parallelism cannot be zero or negative.");
- }
- }
-
- // ------------------------------------------------------------------------
- // Compilation
- // ------------------------------------------------------------------------
-
- /**
- * Translates the given program to an OptimizedPlan, where all nodes have their local strategy assigned
- * and all channels have a shipping strategy assigned.
- *
- * For more details on the optimization phase, see the comments for
- * {@link #compile(org.apache.flink.api.common.Plan, org.apache.flink.optimizer.postpass.OptimizerPostPass)}.
- *
- * @param program The program to be translated.
- * @return The optimized plan.
- *
- * @throws CompilerException
- * Thrown, if the plan is invalid or the optimizer encountered an inconsistent
- * situation during the compilation process.
- */
- public OptimizedPlan compile(Plan program) throws CompilerException {
- final OptimizerPostPass postPasser = getPostPassFromPlan(program);
- return compile(program, postPasser);
- }
-
- /**
- * Translates the given program to an OptimizedPlan. The optimized plan describes for each operator
- * which strategy to use (such as hash join versus sort-merge join), what data exchange method to use
- * (local pipe forward, shuffle, broadcast), what exchange mode to use (pipelined, batch),
- * where to cache intermediate results, etc,
- *
- * The optimization happens in multiple phases:
- * <ol>
- * <li>Create optimizer dag implementation of the program.
- *
- * <tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute size estimates.</li>
- * <li>Compute interesting properties and auxiliary structures.</li>
- * <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting property computation (as
- * opposed to the Database approaches), because we support plans that are not trees.</li>
- * </ol>
- *
- * @param program The program to be translated.
- * @param postPasser The function to be used for post passing the optimizer's plan and setting the
- * data type specific serialization routines.
- * @return The optimized plan.
- *
- * @throws CompilerException
- * Thrown, if the plan is invalid or the optimizer encountered an inconsistent
- * situation during the compilation process.
- */
- private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws CompilerException {
- if (program == null || postPasser == null) {
- throw new NullPointerException();
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Beginning compilation of program '" + program.getJobName() + '\'');
- }
-
- final ExecutionMode defaultDataExchangeMode = program.getExecutionConfig().getExecutionMode();
-
- final int defaultParallelism = program.getDefaultParallelism() > 0 ?
- program.getDefaultParallelism() : this.defaultDegreeOfParallelism;
-
- // log the default settings
- LOG.debug("Using a default parallelism of {}", defaultParallelism);
- LOG.debug("Using default data exchange mode {}", defaultDataExchangeMode);
-
- // the first step in the compilation is to create the optimizer plan representation
- // this step does the following:
- // 1) It creates an optimizer plan node for each operator
- // 2) It connects them via channels
- // 3) It looks for hints about local strategies and channel types and
- // sets the types and strategies accordingly
- // 4) It makes estimates about the data volume of the data sources and
- // propagates those estimates through the plan
-
- GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
- program.accept(graphCreator);
-
- // if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
- // each until we have only a single root node. This allows to transparently deal with the nodes with
- // multiple outputs
- OptimizerNode rootNode;
- if (graphCreator.sinks.size() == 1) {
- rootNode = graphCreator.sinks.get(0);
- } else if (graphCreator.sinks.size() > 1) {
- Iterator<DataSinkNode> iter = graphCreator.sinks.iterator();
- rootNode = iter.next();
-
- while (iter.hasNext()) {
- rootNode = new SinkJoiner(rootNode, iter.next());
- }
- } else {
- throw new CompilerException("Bug: The optimizer plan representation has no sinks.");
- }
-
- // now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
- // guaranteed memory, for further cost estimations. we assume an equal distribution of memory among consumer tasks
- rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
-
- // We are dealing with operator DAGs, rather than operator trees.
- // That requires us to deviate at some points from the classical DB optimizer algorithms.
- // This step build some auxiliary structures to help track branches and joins in the DAG
- BranchesVisitor branchingVisitor = new BranchesVisitor();
- rootNode.accept(branchingVisitor);
-
- // Propagate the interesting properties top-down through the graph
- InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator);
- rootNode.accept(propsVisitor);
-
- // perform a sanity check: the root may not have any unclosed branches
- if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) {
- throw new CompilerException("Bug: Logic for branching plans (non-tree plans) has an error, and does not " +
- "track the re-joining of branches correctly.");
- }
-
- // the final step is now to generate the actual plan alternatives
- List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
-
- if (bestPlan.size() != 1) {
- throw new CompilerException("Error in compiler: more than one best plan was created!");
- }
-
- // check if the best plan's root is a data sink (single sink plan)
- // if so, directly take it. if it is a sink joiner node, get its contained sinks
- PlanNode bestPlanRoot = bestPlan.get(0);
- List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);
-
- if (bestPlanRoot instanceof SinkPlanNode) {
- bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
- } else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
- ((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks);
- }
-
- DeadlockPreventer dp = new DeadlockPreventer();
- dp.resolveDeadlocks(bestPlanSinks);
-
- // finalize the plan
- OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
-
- plan.accept(new BinaryUnionReplacer());
-
- // post pass the plan. this is the phase where the serialization and comparator code is set
- postPasser.postPass(plan);
-
- return plan;
- }
-
- /**
- * This function performs only the first step to the compilation process - the creation of the optimizer
- * representation of the plan. No estimations or enumerations of alternatives are done here.
- *
- * @param program The plan to generate the optimizer representation for.
- * @return The optimizer representation of the plan, as a collection of all data sinks
- * from the plan can be traversed.
- */
- public static List<DataSinkNode> createPreOptimizedPlan(Plan program) {
- GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1, null);
- program.accept(graphCreator);
- return graphCreator.sinks;
- }
-
- // ------------------------------------------------------------------------
- // Visitors for Compilation Traversals
- // ------------------------------------------------------------------------
-
- /**
- * This utility class performs the translation from the user specified program to the optimizer plan.
- * It works as a visitor that walks the user's job in a depth-first fashion. During the descend, it creates
- * an optimizer node for each operator, respectively data source or -sink. During the ascend, it connects
- * the nodes to the full graph.
- * <p>
- * This translator relies on the <code>setInputs</code> method in the nodes. As that method implements the size
- * estimation and the awareness for optimizer hints, the sizes will be properly estimated and the translated plan
- * already respects all optimizer hints.
- */
- public static final class GraphCreatingVisitor implements Visitor<Operator<?>> {
-
- private final Map<Operator<?>, OptimizerNode> con2node; // map from the operator objects to their
- // corresponding optimizer nodes
-
- private final List<DataSinkNode> sinks; // all data sink nodes in the optimizer plan
-
- private final int defaultParallelism; // the default degree of parallelism
-
- private final GraphCreatingVisitor parent; // reference to enclosing creator, in case of a recursive translation
-
- private final ExecutionMode defaultDataExchangeMode;
-
- private final boolean forceDOP;
-
-
- public GraphCreatingVisitor(int defaultParallelism, ExecutionMode defaultDataExchangeMode) {
- this(null, false, defaultParallelism, defaultDataExchangeMode, null);
- }
-
- private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int defaultParallelism,
- ExecutionMode dataExchangeMode, HashMap<Operator<?>, OptimizerNode> closure) {
- if (closure == null){
- con2node = new HashMap<Operator<?>, OptimizerNode>();
- } else {
- con2node = closure;
- }
-
- this.sinks = new ArrayList<DataSinkNode>(2);
- this.defaultParallelism = defaultParallelism;
- this.parent = parent;
- this.defaultDataExchangeMode = dataExchangeMode;
- this.forceDOP = forceDOP;
- }
-
- public List<DataSinkNode> getSinks() {
- return sinks;
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public boolean preVisit(Operator<?> c) {
- // check if we have been here before
- if (this.con2node.containsKey(c)) {
- return false;
- }
-
- final OptimizerNode n;
-
- // create a node for the operator (or sink or source) if we have not been here before
- if (c instanceof GenericDataSinkBase) {
- DataSinkNode dsn = new DataSinkNode((GenericDataSinkBase<?>) c);
- this.sinks.add(dsn);
- n = dsn;
- }
- else if (c instanceof GenericDataSourceBase) {
- n = new DataSourceNode((GenericDataSourceBase<?, ?>) c);
- }
- else if (c instanceof MapOperatorBase) {
- n = new MapNode((MapOperatorBase<?, ?, ?>) c);
- }
- else if (c instanceof MapPartitionOperatorBase) {
- n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
- }
- else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
- n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c);
- }
- else if (c instanceof FlatMapOperatorBase) {
- n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
- }
- else if (c instanceof FilterOperatorBase) {
- n = new FilterNode((FilterOperatorBase<?, ?>) c);
- }
- else if (c instanceof ReduceOperatorBase) {
- n = new ReduceNode((ReduceOperatorBase<?, ?>) c);
- }
- else if (c instanceof GroupReduceOperatorBase) {
- n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
- }
- else if (c instanceof GroupCombineOperatorBase) {
- n = new GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c);
- }
- else if (c instanceof JoinOperatorBase) {
- n = new JoinNode((JoinOperatorBase<?, ?, ?, ?>) c);
- }
- else if (c instanceof CoGroupOperatorBase) {
- n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c);
- }
- else if (c instanceof CrossOperatorBase) {
- n = new CrossNode((CrossOperatorBase<?, ?, ?, ?>) c);
- }
- else if (c instanceof BulkIterationBase) {
- n = new BulkIterationNode((BulkIterationBase<?>) c);
- }
- else if (c instanceof DeltaIterationBase) {
- n = new WorksetIterationNode((DeltaIterationBase<?, ?>) c);
- }
- else if (c instanceof Union){
- n = new BinaryUnionNode((Union<?>) c);
- }
- else if (c instanceof PartitionOperatorBase) {
- n = new PartitionNode((PartitionOperatorBase<?>) c);
- }
- else if (c instanceof SortPartitionOperatorBase) {
- n = new SortPartitionNode((SortPartitionOperatorBase<?>) c);
- }
- else if (c instanceof PartialSolutionPlaceHolder) {
- if (this.parent == null) {
- throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
- }
-
- final PartialSolutionPlaceHolder<?> holder = (PartialSolutionPlaceHolder<?>) c;
- final BulkIterationBase<?> enclosingIteration = holder.getContainingBulkIteration();
- final BulkIterationNode containingIterationNode =
- (BulkIterationNode) this.parent.con2node.get(enclosingIteration);
-
- // catch this for the recursive translation of step functions
- BulkPartialSolutionNode p = new BulkPartialSolutionNode(holder, containingIterationNode);
- p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
- n = p;
- }
- else if (c instanceof WorksetPlaceHolder) {
- if (this.parent == null) {
- throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
- }
-
- final WorksetPlaceHolder<?> holder = (WorksetPlaceHolder<?>) c;
- final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration();
- final WorksetIterationNode containingIterationNode =
- (WorksetIterationNode) this.parent.con2node.get(enclosingIteration);
-
- // catch this for the recursive translation of step functions
- WorksetNode p = new WorksetNode(holder, containingIterationNode);
- p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
- n = p;
- }
- else if (c instanceof SolutionSetPlaceHolder) {
- if (this.parent == null) {
- throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
- }
-
- final SolutionSetPlaceHolder<?> holder = (SolutionSetPlaceHolder<?>) c;
- final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration();
- final WorksetIterationNode containingIterationNode =
- (WorksetIterationNode) this.parent.con2node.get(enclosingIteration);
-
- // catch this for the recursive translation of step functions
- SolutionSetNode p = new SolutionSetNode(holder, containingIterationNode);
- p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
- n = p;
- }
- else {
- throw new IllegalArgumentException("Unknown operator type: " + c);
- }
-
- this.con2node.put(c, n);
-
- // set the parallelism only if it has not been set before. some nodes have a fixed DOP, such as the
- // key-less reducer (all-reduce)
- if (n.getDegreeOfParallelism() < 1) {
- // set the degree of parallelism
- int par = c.getDegreeOfParallelism();
- if (par > 0) {
- if (this.forceDOP && par != this.defaultParallelism) {
- par = this.defaultParallelism;
- LOG.warn("The parallelism of nested dataflows (such as step functions in iterations) is " +
- "currently fixed to the parallelism of the surrounding operator (the iteration).");
- }
- } else {
- par = this.defaultParallelism;
- }
- n.setDegreeOfParallelism(par);
- }
-
- return true;
- }
-
- @Override
- public void postVisit(Operator<?> c) {
-
- OptimizerNode n = this.con2node.get(c);
-
- // first connect to the predecessors
- n.setInput(this.con2node, this.defaultDataExchangeMode);
- n.setBroadcastInputs(this.con2node, this.defaultDataExchangeMode);
-
- // if the node represents a bulk iteration, we recursively translate the data flow now
- if (n instanceof BulkIterationNode) {
- final BulkIterationNode iterNode = (BulkIterationNode) n;
- final BulkIterationBase<?> iter = iterNode.getIterationContract();
-
- // pass a copy of the no iterative part into the iteration translation,
- // in case the iteration references its closure
- HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>, OptimizerNode>(con2node);
-
- // first, recursively build the data flow for the step function
- final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
- iterNode.getDegreeOfParallelism(), defaultDataExchangeMode, closure);
-
- BulkPartialSolutionNode partialSolution;
-
- iter.getNextPartialSolution().accept(recursiveCreator);
-
- partialSolution = (BulkPartialSolutionNode) recursiveCreator.con2node.get(iter.getPartialSolution());
- OptimizerNode rootOfStepFunction = recursiveCreator.con2node.get(iter.getNextPartialSolution());
- if (partialSolution == null) {
- throw new CompilerException("Error: The step functions result does not depend on the partial solution.");
- }
-
-
- OptimizerNode terminationCriterion = null;
-
- if (iter.getTerminationCriterion() != null) {
- terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion());
-
- // no intermediate node yet, traverse from the termination criterion to build the missing parts
- if (terminationCriterion == null) {
- iter.getTerminationCriterion().accept(recursiveCreator);
- terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion());
- }
- }
-
- iterNode.setPartialSolution(partialSolution);
- iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion);
-
- // go over the contained data flow and mark the dynamic path nodes
- StaticDynamicPathIdentifier identifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
- iterNode.acceptForStepFunction(identifier);
- }
- else if (n instanceof WorksetIterationNode) {
- final WorksetIterationNode iterNode = (WorksetIterationNode) n;
- final DeltaIterationBase<?, ?> iter = iterNode.getIterationContract();
-
- // we need to ensure that both the next-workset and the solution-set-delta depend on the workset.
- // One check is for free during the translation, we do the other check here as a pre-condition
- {
- StepFunctionValidator wsf = new StepFunctionValidator();
- iter.getNextWorkset().accept(wsf);
- if (!wsf.foundWorkset) {
- throw new CompilerException("In the given program, the next workset does not depend on the workset. " +
- "This is a prerequisite in delta iterations.");
- }
- }
-
- // calculate the closure of the anonymous function
- HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>, OptimizerNode>(con2node);
-
- // first, recursively build the data flow for the step function
- final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(
- this, true, iterNode.getDegreeOfParallelism(), defaultDataExchangeMode, closure);
-
- // descend from the solution set delta. check that it depends on both the workset
- // and the solution set. If it does depend on both, this descend should create both nodes
- iter.getSolutionSetDelta().accept(recursiveCreator);
-
- final WorksetNode worksetNode = (WorksetNode) recursiveCreator.con2node.get(iter.getWorkset());
-
- if (worksetNode == null) {
- throw new CompilerException("In the given program, the solution set delta does not depend on the workset." +
- "This is a prerequisite in delta iterations.");
- }
-
- iter.getNextWorkset().accept(recursiveCreator);
-
- SolutionSetNode solutionSetNode = (SolutionSetNode) recursiveCreator.con2node.get(iter.getSolutionSet());
-
- if (solutionSetNode == null || solutionSetNode.getOutgoingConnections() == null || solutionSetNode.getOutgoingConnections().isEmpty()) {
- solutionSetNode = new SolutionSetNode((SolutionSetPlaceHolder<?>) iter.getSolutionSet(), iterNode);
- }
- else {
- for (PactConnection conn : solutionSetNode.getOutgoingConnections()) {
- OptimizerNode successor = conn.getTarget();
-
- if (successor.getClass() == JoinNode.class) {
- // find out which input to the match the solution set is
- JoinNode mn = (JoinNode) successor;
- if (mn.getFirstPredecessorNode() == solutionSetNode) {
- mn.makeJoinWithSolutionSet(0);
- } else if (mn.getSecondPredecessorNode() == solutionSetNode) {
- mn.makeJoinWithSolutionSet(1);
- } else {
- throw new CompilerException();
- }
- }
- else if (successor.getClass() == CoGroupNode.class) {
- CoGroupNode cg = (CoGroupNode) successor;
- if (cg.getFirstPredecessorNode() == solutionSetNode) {
- cg.makeCoGroupWithSolutionSet(0);
- } else if (cg.getSecondPredecessorNode() == solutionSetNode) {
- cg.makeCoGroupWithSolutionSet(1);
- } else {
- throw new CompilerException();
- }
- }
- else {
- throw new InvalidProgramException(
- "Error: The only operations allowed on the solution set are Join and CoGroup.");
- }
- }
- }
-
- final OptimizerNode nextWorksetNode = recursiveCreator.con2node.get(iter.getNextWorkset());
- final OptimizerNode solutionSetDeltaNode = recursiveCreator.con2node.get(iter.getSolutionSetDelta());
-
- // set the step function nodes to the iteration node
- iterNode.setPartialSolution(solutionSetNode, worksetNode);
- iterNode.setNextPartialSolution(solutionSetDeltaNode, nextWorksetNode, defaultDataExchangeMode);
-
- // go over the contained data flow and mark the dynamic path nodes
- StaticDynamicPathIdentifier pathIdentifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
- iterNode.acceptForStepFunction(pathIdentifier);
- }
- }
- }
-
- private static final class StaticDynamicPathIdentifier implements Visitor<OptimizerNode> {
-
- private final Set<OptimizerNode> seenBefore = new HashSet<OptimizerNode>();
-
- private final int costWeight;
-
- private StaticDynamicPathIdentifier(int costWeight) {
- this.costWeight = costWeight;
- }
-
- @Override
- public boolean preVisit(OptimizerNode visitable) {
- return this.seenBefore.add(visitable);
- }
-
- @Override
- public void postVisit(OptimizerNode visitable) {
- visitable.identifyDynamicPath(this.costWeight);
-
- // check that there is no nested iteration on the dynamic path
- if (visitable.isOnDynamicPath() && visitable instanceof IterationNode) {
- throw new CompilerException("Nested iterations are currently not supported.");
- }
- }
- }
-
- /**
- * Simple visitor that sets the minimal guaranteed memory per task based on the amount of available memory,
- * the number of memory consumers, and on the task's degree of parallelism.
- */
- public static final class IdAndEstimatesVisitor implements Visitor<OptimizerNode> {
-
- private final DataStatistics statistics;
-
- private int id = 1;
-
- public IdAndEstimatesVisitor(DataStatistics statistics) {
- this.statistics = statistics;
- }
-
- @Override
- public boolean preVisit(OptimizerNode visitable) {
- return visitable.getId() == -1;
- }
-
- @Override
- public void postVisit(OptimizerNode visitable) {
- // the node ids
- visitable.initId(this.id++);
-
- // connections need to figure out their maximum path depths
- for (PactConnection conn : visitable.getIncomingConnections()) {
- conn.initMaxDepth();
- }
- for (PactConnection conn : visitable.getBroadcastConnections()) {
- conn.initMaxDepth();
- }
-
- // the estimates
- visitable.computeOutputEstimates(this.statistics);
-
- // if required, recurse into the step function
- if (visitable instanceof IterationNode) {
- ((IterationNode) visitable).acceptForStepFunction(this);
- }
- }
- }
-
- /**
- * Visitor that computes the interesting properties for each node in the plan. On its recursive
- * depth-first descend, it propagates all interesting properties top-down.
- */
- public static final class InterestingPropertyVisitor implements Visitor<OptimizerNode> {
-
- private CostEstimator estimator; // the cost estimator for maximal costs of an interesting property
-
- /**
- * Creates a new visitor that computes the interesting properties for all nodes in the plan.
- * It uses the given cost estimator used to compute the maximal costs for an interesting property.
- *
- * @param estimator
- * The cost estimator to estimate the maximal costs for interesting properties.
- */
- public InterestingPropertyVisitor(CostEstimator estimator) {
- this.estimator = estimator;
- }
-
- @Override
- public boolean preVisit(OptimizerNode node) {
- // The interesting properties must be computed on the descend. In case a node has multiple outputs,
- // that computation must happen during the last descend.
-
- if (node.getInterestingProperties() == null && node.haveAllOutputConnectionInterestingProperties()) {
- node.computeUnionOfInterestingPropertiesFromSuccessors();
- node.computeInterestingPropertiesForInputs(this.estimator);
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public void postVisit(OptimizerNode visitable) {}
- }
-
- /**
- * On its re-ascend (post visit) this visitor, computes auxiliary maps that are needed to support plans
- * that are not a minimally connected DAG (Such plans are not trees, but at least one node feeds its
- * output into more than one other node).
- */
- public static final class BranchesVisitor implements Visitor<OptimizerNode> {
-
- @Override
- public boolean preVisit(OptimizerNode node) {
- return node.getOpenBranches() == null;
- }
-
- @Override
- public void postVisit(OptimizerNode node) {
- if (node instanceof IterationNode) {
- ((IterationNode) node).acceptForStepFunction(this);
- }
-
- node.computeUnclosedBranchStack();
- }
- }
-
- /**
- * Finalization of the plan:
- * - The graph of nodes is double-linked (links from child to parent are inserted)
- * - If unions join static and dynamic paths, the cache is marked as a memory consumer
- * - Relative memory fractions are assigned to all nodes.
- * - All nodes are collected into a set.
- */
- private static final class PlanFinalizer implements Visitor<PlanNode> {
-
- private final Set<PlanNode> allNodes; // a set of all nodes in the optimizer plan
-
- private final List<SourcePlanNode> sources; // all data source nodes in the optimizer plan
-
- private final List<SinkPlanNode> sinks; // all data sink nodes in the optimizer plan
-
- private final Deque<IterationPlanNode> stackOfIterationNodes;
-
- private int memoryConsumerWeights; // a counter of all memory consumers
-
- /**
- * Creates a new plan finalizer.
- */
- private PlanFinalizer() {
- this.allNodes = new HashSet<PlanNode>();
- this.sources = new ArrayList<SourcePlanNode>();
- this.sinks = new ArrayList<SinkPlanNode>();
- this.stackOfIterationNodes = new ArrayDeque<IterationPlanNode>();
- }
-
- private OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan) {
- this.memoryConsumerWeights = 0;
-
- // traverse the graph
- for (SinkPlanNode node : sinks) {
- node.accept(this);
- }
-
- // assign the memory to each node
- if (this.memoryConsumerWeights > 0) {
- for (PlanNode node : this.allNodes) {
- // assign memory to the driver strategy of the node
- final int consumerWeight = node.getMemoryConsumerWeight();
- if (consumerWeight > 0) {
- final double relativeMem = (double)consumerWeight / this.memoryConsumerWeights;
- node.setRelativeMemoryPerSubtask(relativeMem);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Assigned " + relativeMem + " of total memory to each subtask of " +
- node.getPactContract().getName() + ".");
- }
- }
-
- // assign memory to the local and global strategies of the channels
- for (Channel c : node.getInputs()) {
- if (c.getLocalStrategy().dams()) {
- final double relativeMem = 1.0 / this.memoryConsumerWeights;
- c.setRelativeMemoryLocalStrategy(relativeMem);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Assigned " + relativeMem + " of total memory to each local strategy " +
- "instance of " + c + ".");
- }
- }
- if (c.getTempMode() != TempMode.NONE) {
- final double relativeMem = 1.0/ this.memoryConsumerWeights;
- c.setRelativeTempMemory(relativeMem);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " +
- "table for " + c + ".");
- }
- }
- }
- }
- }
- return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName, originalPlan);
- }
-
- @Override
- public boolean preVisit(PlanNode visitable) {
- // if we come here again, prevent a further descend
- if (!this.allNodes.add(visitable)) {
- return false;
- }
-
- if (visitable instanceof SinkPlanNode) {
- this.sinks.add((SinkPlanNode) visitable);
- }
- else if (visitable instanceof SourcePlanNode) {
- this.sources.add((SourcePlanNode) visitable);
- }
- else if (visitable instanceof BinaryUnionPlanNode) {
- BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
- if (unionNode.unionsStaticAndDynamicPath()) {
- unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED);
- }
- }
- else if (visitable instanceof BulkPartialSolutionPlanNode) {
- // tell the partial solution about the iteration node that contains it
- final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable;
- final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
-
- // sanity check!
- if (iteration == null || !(iteration instanceof BulkIterationPlanNode)) {
- throw new CompilerException("Bug: Error finalizing the plan. " +
- "Cannot associate the node for a partial solutions with its containing iteration.");
- }
- pspn.setContainingIterationNode((BulkIterationPlanNode) iteration);
- }
- else if (visitable instanceof WorksetPlanNode) {
- // tell the partial solution about the iteration node that contains it
- final WorksetPlanNode wspn = (WorksetPlanNode) visitable;
- final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
-
- // sanity check!
- if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
- throw new CompilerException("Bug: Error finalizing the plan. " +
- "Cannot associate the node for a partial solutions with its containing iteration.");
- }
- wspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
- }
- else if (visitable instanceof SolutionSetPlanNode) {
- // tell the partial solution about the iteration node that contains it
- final SolutionSetPlanNode sspn = (SolutionSetPlanNode) visitable;
- final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
-
- // sanity check!
- if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
- throw new CompilerException("Bug: Error finalizing the plan. " +
- "Cannot associate the node for a partial solutions with its containing iteration.");
- }
- sspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
- }
-
- // double-connect the connections. previously, only parents knew their children, because
- // one child candidate could have been referenced by multiple parents.
- for (Channel conn : visitable.getInputs()) {
- conn.setTarget(visitable);
- conn.getSource().addOutgoingChannel(conn);
- }
-
- for (Channel c : visitable.getBroadcastInputs()) {
- c.setTarget(visitable);
- c.getSource().addOutgoingChannel(c);
- }
-
- // count the memory consumption
- this.memoryConsumerWeights += visitable.getMemoryConsumerWeight();
- for (Channel c : visitable.getInputs()) {
- if (c.getLocalStrategy().dams()) {
- this.memoryConsumerWeights++;
- }
- if (c.getTempMode() != TempMode.NONE) {
- this.memoryConsumerWeights++;
- }
- }
- for (Channel c : visitable.getBroadcastInputs()) {
- if (c.getLocalStrategy().dams()) {
- this.memoryConsumerWeights++;
- }
- if (c.getTempMode() != TempMode.NONE) {
- this.memoryConsumerWeights++;
- }
- }
-
- // pass the visitor to the iteraton's step function
- if (visitable instanceof IterationPlanNode) {
- // push the iteration node onto the stack
- final IterationPlanNode iterNode = (IterationPlanNode) visitable;
- this.stackOfIterationNodes.addLast(iterNode);
-
- // recurse
- ((IterationPlanNode) visitable).acceptForStepFunction(this);
-
- // pop the iteration node from the stack
- this.stackOfIterationNodes.removeLast();
- }
- return true;
- }
-
- @Override
- public void postVisit(PlanNode visitable) {}
- }
-
- /**
- * A visitor that traverses the graph and collects cascading binary unions into a single n-ary
- * union operator. The exception is, when on of the union inputs is materialized, such as in the
- * static-code-path-cache in iterations.
- */
- private static final class BinaryUnionReplacer implements Visitor<PlanNode> {
-
- private final Set<PlanNode> seenBefore = new HashSet<PlanNode>();
-
- @Override
- public boolean preVisit(PlanNode visitable) {
- if (this.seenBefore.add(visitable)) {
- if (visitable instanceof IterationPlanNode) {
- ((IterationPlanNode) visitable).acceptForStepFunction(this);
- }
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public void postVisit(PlanNode visitable) {
-
- if (visitable instanceof BinaryUnionPlanNode) {
-
- final BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
- final Channel in1 = unionNode.getInput1();
- final Channel in2 = unionNode.getInput2();
-
- if (!unionNode.unionsStaticAndDynamicPath()) {
-
- // both on static path, or both on dynamic path. we can collapse them
- NAryUnionPlanNode newUnionNode;
-
- List<Channel> inputs = new ArrayList<Channel>();
- collect(in1, inputs);
- collect(in2, inputs);
-
- newUnionNode = new NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs,
- unionNode.getGlobalProperties(), unionNode.getCumulativeCosts());
-
- newUnionNode.setDegreeOfParallelism(unionNode.getDegreeOfParallelism());
-
- for (Channel c : inputs) {
- c.setTarget(newUnionNode);
- }
-
- for (Channel channel : unionNode.getOutgoingChannels()) {
- channel.swapUnionNodes(newUnionNode);
- newUnionNode.addOutgoingChannel(channel);
- }
- }
- else {
- // union between the static and the dynamic path. we need to handle this for now
- // through a special union operator
-
- // make sure that the first input is the cached (static) and the second input is the dynamic
- if (in1.isOnDynamicPath()) {
- BinaryUnionPlanNode newUnionNode = new BinaryUnionPlanNode(unionNode);
-
- in1.setTarget(newUnionNode);
- in2.setTarget(newUnionNode);
-
- for (Channel channel : unionNode.getOutgoingChannels()) {
- channel.swapUnionNodes(newUnionNode);
- newUnionNode.addOutgoingChannel(channel);
- }
- }
- }
- }
- }
-
- private void collect(Channel in, List<Channel> inputs) {
- if (in.getSource() instanceof NAryUnionPlanNode) {
- // sanity check
- if (in.getShipStrategy() != ShipStrategyType.FORWARD) {
- throw new CompilerException("Bug: Plan generation for Unions picked a ship strategy between binary plan operators.");
- }
- if (!(in.getLocalStrategy() == null || in.getLocalStrategy() == LocalStrategy.NONE)) {
- throw new CompilerException("Bug: Plan generation for Unions picked a local strategy between binary plan operators.");
- }
-
- inputs.addAll(((NAryUnionPlanNode) in.getSource()).getListOfInputs());
- } else {
- // is not a collapsed union node, so we take the channel directly
- inputs.add(in);
- }
- }
- }
-
- private static final class StepFunctionValidator implements Visitor<Operator<?>> {
-
- private final Set<Operator<?>> seenBefore = new HashSet<Operator<?>>();
-
- private boolean foundWorkset;
-
- @Override
- public boolean preVisit(Operator<?> visitable) {
- if (visitable instanceof WorksetPlaceHolder) {
- foundWorkset = true;
- }
-
- return (!foundWorkset) && seenBefore.add(visitable);
- }
-
- @Override
- public void postVisit(Operator<?> visitable) {}
- }
-
- // ------------------------------------------------------------------------
- // Miscellaneous
- // ------------------------------------------------------------------------
-
- private OptimizerPostPass getPostPassFromPlan(Plan program) {
- final String className = program.getPostPassClassName();
- if (className == null) {
- throw new CompilerException("Optimizer Post Pass class description is null");
- }
- try {
- Class<? extends OptimizerPostPass> clazz = Class.forName(className).asSubclass(OptimizerPostPass.class);
- try {
- return InstantiationUtil.instantiate(clazz, OptimizerPostPass.class);
- } catch (RuntimeException rtex) {
- // unwrap the source exception
- if (rtex.getCause() != null) {
- throw new CompilerException("Cannot instantiate optimizer post pass: " + rtex.getMessage(), rtex.getCause());
- } else {
- throw rtex;
- }
- }
- } catch (ClassNotFoundException cnfex) {
- throw new CompilerException("Cannot load Optimizer post-pass class '" + className + "'.", cnfex);
- } catch (ClassCastException ccex) {
- throw new CompilerException("Class '" + className + "' is not an optimizer post passer.", ccex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
index f8404c4..d199ae7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.optimizer.dag;
import java.util.Collections;
@@ -62,8 +61,8 @@ public abstract class AbstractPartialSolutionNode extends OptimizerNode {
}
@Override
- public List<PactConnection> getIncomingConnections() {
- return Collections.<PactConnection>emptyList();
+ public List<DagConnection> getIncomingConnections() {
+ return Collections.emptyList();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
index 8aed10c..068799e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -107,7 +107,7 @@ public class BinaryUnionNode extends TwoInputNode {
final List<? extends PlanNode> subPlans1 = getFirstPredecessorNode().getAlternativePlans(estimator);
final List<? extends PlanNode> subPlans2 = getSecondPredecessorNode().getAlternativePlans(estimator);
- List<PactConnection> broadcastConnections = getBroadcastConnections();
+ List<DagConnection> broadcastConnections = getBroadcastConnections();
if (broadcastConnections != null && broadcastConnections.size() > 0) {
throw new CompilerException("Found BroadcastVariables on a Union operation");
}
@@ -122,9 +122,9 @@ public class BinaryUnionNode extends TwoInputNode {
final ExecutionMode input1Mode = this.input1.getDataExchangeMode();
final ExecutionMode input2Mode = this.input2.getDataExchangeMode();
- final int dop = getDegreeOfParallelism();
- final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism();
- final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism();
+ final int dop = getParallelism();
+ final int inDop1 = getFirstPredecessorNode().getParallelism();
+ final int inDop2 = getSecondPredecessorNode().getParallelism();
final boolean dopChange1 = dop != inDop1;
final boolean dopChange2 = dop != inDop2;
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
index 8112748..c55d17a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.common.operators.base.BulkIterationBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler.InterestingPropertyVisitor;
+import org.apache.flink.optimizer.Optimizer.InterestingPropertyVisitor;
import org.apache.flink.optimizer.costs.CostEstimator;
import org.apache.flink.optimizer.dag.WorksetIterationNode.SingleRootJoiner;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -62,9 +62,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
private OptimizerNode nextPartialSolution;
- private PactConnection rootConnection; // connection out of the next partial solution
+ private DagConnection rootConnection; // connection out of the next partial solution
- private PactConnection terminationCriterionRootConnection; // connection out of the term. criterion
+ private DagConnection terminationCriterionRootConnection; // connection out of the term. criterion
private OptimizerNode singleRoot;
@@ -93,7 +93,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
// --------------------------------------------------------------------------------------------
public BulkIterationBase<?> getIterationContract() {
- return (BulkIterationBase<?>) getPactContract();
+ return (BulkIterationBase<?>) getOperator();
}
/**
@@ -133,14 +133,14 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
// check if the root of the step function has the same DOP as the iteration
// or if the step function has any operator at all
- if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism() ||
+ if (nextPartialSolution.getParallelism() != getParallelism() ||
nextPartialSolution == partialSolution || nextPartialSolution instanceof BinaryUnionNode)
{
// add a no-op to the root to express the re-partitioning
NoOpNode noop = new NoOpNode();
- noop.setDegreeOfParallelism(getDegreeOfParallelism());
+ noop.setDegreeOfParallelism(getParallelism());
- PactConnection noOpConn = new PactConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED);
+ DagConnection noOpConn = new DagConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED);
noop.setIncomingConnection(noOpConn);
nextPartialSolution.addOutgoingConnection(noOpConn);
@@ -152,13 +152,13 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
if (terminationCriterion == null) {
this.singleRoot = nextPartialSolution;
- this.rootConnection = new PactConnection(nextPartialSolution, ExecutionMode.PIPELINED);
+ this.rootConnection = new DagConnection(nextPartialSolution, ExecutionMode.PIPELINED);
}
else {
// we have a termination criterion
SingleRootJoiner singleRootJoiner = new SingleRootJoiner();
- this.rootConnection = new PactConnection(nextPartialSolution, singleRootJoiner, ExecutionMode.PIPELINED);
- this.terminationCriterionRootConnection = new PactConnection(terminationCriterion, singleRootJoiner,
+ this.rootConnection = new DagConnection(nextPartialSolution, singleRootJoiner, ExecutionMode.PIPELINED);
+ this.terminationCriterionRootConnection = new DagConnection(terminationCriterion, singleRootJoiner,
ExecutionMode.PIPELINED);
singleRootJoiner.setInputs(this.rootConnection, this.terminationCriterionRootConnection);
@@ -323,7 +323,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
locPropsReq.parameterizeChannel(toNoOp);
UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
- rebuildPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
+ rebuildPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
@@ -352,7 +352,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
if (terminationCriterion == null) {
for (PlanNode candidate : candidates) {
- BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getPactContract().getName()+")", in, pspn, candidate);
+ BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate);
GlobalProperties gProps = candidate.getGlobalProperties().clone();
LocalProperties lProps = candidate.getLocalProperties().clone();
node.initProperties(gProps, lProps);
@@ -367,7 +367,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
for (PlanNode candidate : candidates) {
for (PlanNode terminationCandidate : terminationCriterionCandidates) {
if (singleRoot.areBranchCompatible(candidate, terminationCandidate)) {
- BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getPactContract().getName()+")", in, pspn, candidate, terminationCandidate);
+ BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate, terminationCandidate);
GlobalProperties gProps = candidate.getGlobalProperties().clone();
LocalProperties lProps = candidate.getLocalProperties().clone();
node.initProperties(gProps, lProps);
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
index a6e03ff..25a7eef 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.optimizer.dag;
import java.util.Collections;
@@ -49,7 +48,8 @@ public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
if (this.cachedPlans != null) {
throw new IllegalStateException();
} else {
- this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, "PartialSolution ("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
+ this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this,
+ "PartialSolution ("+this.getOperator().getName()+")", gProps, lProps, initialInput));
}
}
@@ -73,13 +73,14 @@ public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
// --------------------------------------------------------------------------------------------
/**
- * Gets the contract object for this data source node.
+ * Gets the operator (here the {@link PartialSolutionPlaceHolder}) that is represented by this
+ * optimizer node.
*
- * @return The contract.
+ * @return The operator represented by this optimizer node.
*/
@Override
- public PartialSolutionPlaceHolder<?> getPactContract() {
- return (PartialSolutionPlaceHolder<?>) super.getPactContract();
+ public PartialSolutionPlaceHolder<?> getOperator() {
+ return (PartialSolutionPlaceHolder<?>) super.getOperator();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
index 7c0cc9a..92076c3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
@@ -50,8 +50,8 @@ public class CoGroupNode extends TwoInputNode {
* @return The CoGroup operator.
*/
@Override
- public CoGroupOperatorBase<?, ?, ?, ?> getPactContract() {
- return (CoGroupOperatorBase<?, ?, ?, ?>) super.getPactContract();
+ public CoGroupOperatorBase<?, ?, ?, ?> getOperator() {
+ return (CoGroupOperatorBase<?, ?, ?, ?>) super.getOperator();
}
@Override
@@ -85,7 +85,7 @@ public class CoGroupNode extends TwoInputNode {
Ordering groupOrder1 = null;
Ordering groupOrder2 = null;
- CoGroupOperatorBase<?, ?, ?, ?> cgc = getPactContract();
+ CoGroupOperatorBase<?, ?, ?, ?> cgc = getOperator();
groupOrder1 = cgc.getGroupOrderForInputOne();
groupOrder2 = cgc.getGroupOrderForInputTwo();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
index afeed1d..8de67e8 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.operators.base.CrossOperatorBase;
import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.operators.CrossBlockOuterFirstDescriptor;
import org.apache.flink.optimizer.operators.CrossBlockOuterSecondDescriptor;
import org.apache.flink.optimizer.operators.CrossStreamOuterFirstDescriptor;
@@ -50,7 +50,7 @@ public class CrossNode extends TwoInputNode {
super(operation);
Configuration conf = operation.getParameters();
- String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
+ String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
CrossHint hint = operation.getCrossHint();
@@ -60,13 +60,13 @@ public class CrossNode extends TwoInputNode {
final boolean allowBCsecond = hint != CrossHint.FIRST_IS_SMALL;
final OperatorDescriptorDual fixedDriverStrat;
- if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy)) {
+ if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy)) {
fixedDriverStrat = new CrossBlockOuterFirstDescriptor(allowBCfirst, allowBCsecond);
- } else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy)) {
+ } else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy)) {
fixedDriverStrat = new CrossBlockOuterSecondDescriptor(allowBCfirst, allowBCsecond);
- } else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy)) {
+ } else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy)) {
fixedDriverStrat = new CrossStreamOuterFirstDescriptor(allowBCfirst, allowBCsecond);
- } else if (PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy)) {
+ } else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy)) {
fixedDriverStrat = new CrossStreamOuterSecondDescriptor(allowBCfirst, allowBCsecond);
} else {
throw new CompilerException("Invalid local strategy hint for cross contract: " + localStrategy);
@@ -99,8 +99,8 @@ public class CrossNode extends TwoInputNode {
// ------------------------------------------------------------------------
@Override
- public CrossOperatorBase<?, ?, ?, ?> getPactContract() {
- return (CrossOperatorBase<?, ?, ?, ?>) super.getPactContract();
+ public CrossOperatorBase<?, ?, ?, ?> getOperator() {
+ return (CrossOperatorBase<?, ?, ?, ?>) super.getOperator();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
new file mode 100644
index 0000000..360f579
--- /dev/null
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.plandump.DumpableConnection;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+/**
+ * A connection between to operators. Represents an intermediate result
+ * and a data exchange between the two operators.
+ *
+ * The data exchange has a mode in which it performs (batch / pipelined).
+ *
+ * The data exchange strategy may be set on this connection, in which case
+ * it is fixed and will not be determined during candidate plan enumeration.
+ *
+ * During the enumeration of interesting properties, this connection also holds
+ * all interesting properties generated by the successor operator.
+ */
+public class DagConnection implements EstimateProvider, DumpableConnection<OptimizerNode> {
+
+ private final OptimizerNode source; // The source node of the connection
+
+ private final OptimizerNode target; // The target node of the connection.
+
+ private final ExecutionMode dataExchangeMode; // defines whether to use batch or pipelined data exchange
+
+ private InterestingProperties interestingProps; // local properties that succeeding nodes are interested in
+
+ private ShipStrategyType shipStrategy; // The data shipping strategy, if predefined.
+
+ private TempMode materializationMode = TempMode.NONE; // the materialization mode
+
+ private int maxDepth = -1;
+
+ private boolean breakPipeline; // whether this connection should break the pipeline due to potential deadlocks
+
+ /**
+ * Creates a new Connection between two nodes. The shipping strategy is by default <tt>NONE</tt>.
+ * The temp mode is by default <tt>NONE</tt>.
+ *
+ * @param source
+ * The source node.
+ * @param target
+ * The target node.
+ */
+ public DagConnection(OptimizerNode source, OptimizerNode target, ExecutionMode exchangeMode) {
+ this(source, target, null, exchangeMode);
+ }
+
+ /**
+ * Creates a new Connection between two nodes.
+ *
+ * @param source
+ * The source node.
+ * @param target
+ * The target node.
+ * @param shipStrategy
+ * The shipping strategy.
+ * @param exchangeMode
+ * The data exchange mode (pipelined / batch / batch only for shuffles / ... )
+ */
+ public DagConnection(OptimizerNode source, OptimizerNode target,
+ ShipStrategyType shipStrategy, ExecutionMode exchangeMode)
+ {
+ if (source == null || target == null) {
+ throw new NullPointerException("Source and target must not be null.");
+ }
+ this.source = source;
+ this.target = target;
+ this.shipStrategy = shipStrategy;
+ this.dataExchangeMode = exchangeMode;
+ }
+
+ /**
+ * Constructor to create a result from an operator that is not
+ * consumed by another operator.
+ *
+ * @param source
+ * The source node.
+ */
+ public DagConnection(OptimizerNode source, ExecutionMode exchangeMode) {
+ if (source == null) {
+ throw new NullPointerException("Source and target must not be null.");
+ }
+ this.source = source;
+ this.target = null;
+ this.shipStrategy = ShipStrategyType.NONE;
+ this.dataExchangeMode = exchangeMode;
+ }
+
+ /**
+ * Gets the source of the connection.
+ *
+ * @return The source Node.
+ */
+ public OptimizerNode getSource() {
+ return this.source;
+ }
+
+ /**
+ * Gets the target of the connection.
+ *
+ * @return The target node.
+ */
+ public OptimizerNode getTarget() {
+ return this.target;
+ }
+
+ /**
+ * Gets the shipping strategy for this connection.
+ *
+ * @return The connection's shipping strategy.
+ */
+ public ShipStrategyType getShipStrategy() {
+ return this.shipStrategy;
+ }
+
+ /**
+ * Sets the shipping strategy for this connection.
+ *
+ * @param strategy
+ * The shipping strategy to be applied to this connection.
+ */
+ public void setShipStrategy(ShipStrategyType strategy) {
+ this.shipStrategy = strategy;
+ }
+
+ /**
+ * Gets the data exchange mode to use for this connection.
+ *
+ * @return The data exchange mode to use for this connection.
+ */
+ public ExecutionMode getDataExchangeMode() {
+ if (dataExchangeMode == null) {
+ throw new IllegalStateException("This connection does not have the data exchange mode set");
+ }
+ return dataExchangeMode;
+ }
+
+ /**
+ * Marks that this connection should do a decoupled data exchange (such as batched)
+ * rather then pipeline data. Connections are marked as pipeline breakers to avoid
+ * deadlock situations.
+ */
+ public void markBreaksPipeline() {
+ this.breakPipeline = true;
+ }
+
+ /**
+ * Checks whether this connection is marked to break the pipeline.
+ *
+ * @return True, if this connection is marked to break the pipeline, false otherwise.
+ */
+ public boolean isBreakingPipeline() {
+ return this.breakPipeline;
+ }
+
+ /**
+ * Gets the interesting properties object for this pact connection.
+ * If the interesting properties for this connections have not yet been set,
+ * this method returns null.
+ *
+ * @return The collection of all interesting properties, or null, if they have not yet been set.
+ */
+ public InterestingProperties getInterestingProperties() {
+ return this.interestingProps;
+ }
+
+ /**
+ * Sets the interesting properties for this pact connection.
+ *
+ * @param props The interesting properties.
+ */
+ public void setInterestingProperties(InterestingProperties props) {
+ if (this.interestingProps == null) {
+ this.interestingProps = props;
+ } else {
+ throw new IllegalStateException("Interesting Properties have already been set.");
+ }
+ }
+
+ public void clearInterestingProperties() {
+ this.interestingProps = null;
+ }
+
+ public void initMaxDepth() {
+
+ if (this.maxDepth == -1) {
+ this.maxDepth = this.source.getMaxDepth() + 1;
+ } else {
+ throw new IllegalStateException("Maximum path depth has already been initialized.");
+ }
+ }
+
+ public int getMaxDepth() {
+ if (this.maxDepth != -1) {
+ return this.maxDepth;
+ } else {
+ throw new IllegalStateException("Maximum path depth has not been initialized.");
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Estimates
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public long getEstimatedOutputSize() {
+ return this.source.getEstimatedOutputSize();
+ }
+
+ @Override
+ public long getEstimatedNumRecords() {
+ return this.source.getEstimatedNumRecords();
+ }
+
+ @Override
+ public float getEstimatedAvgWidthPerOutputRecord() {
+ return this.source.getEstimatedAvgWidthPerOutputRecord();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+
+ public TempMode getMaterializationMode() {
+ return this.materializationMode;
+ }
+
+ public void setMaterializationMode(TempMode materializationMode) {
+ this.materializationMode = materializationMode;
+ }
+
+ public boolean isOnDynamicPath() {
+ return this.source.isOnDynamicPath();
+ }
+
+ public int getCostWeight() {
+ return this.source.getCostWeight();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public String toString() {
+ StringBuilder buf = new StringBuilder(50);
+ buf.append("Connection: ");
+
+ if (this.source == null) {
+ buf.append("null");
+ } else {
+ buf.append(this.source.getOperator().getName());
+ buf.append('(').append(this.source.getName()).append(')');
+ }
+
+ buf.append(" -> ");
+
+ if (this.shipStrategy != null) {
+ buf.append('[');
+ buf.append(this.shipStrategy.name());
+ buf.append(']').append(' ');
+ }
+
+ if (this.target == null) {
+ buf.append("null");
+ } else {
+ buf.append(this.target.getOperator().getName());
+ buf.append('(').append(this.target.getName()).append(')');
+ }
+
+ return buf.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
index 9e4f457..dbe04f4 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
@@ -46,7 +46,7 @@ import org.apache.flink.util.Visitor;
*/
public class DataSinkNode extends OptimizerNode {
- protected PactConnection input; // The input edge
+ protected DagConnection input; // The input edge
/**
* Creates a new DataSinkNode for the given sink operator.
@@ -64,7 +64,7 @@ public class DataSinkNode extends OptimizerNode {
*
* @return The input connection.
*/
- public PactConnection getInputConnection() {
+ public DagConnection getInputConnection() {
return this.input;
}
@@ -87,8 +87,8 @@ public class DataSinkNode extends OptimizerNode {
* @return The node's underlying operator.
*/
@Override
- public GenericDataSinkBase<?> getPactContract() {
- return (GenericDataSinkBase<?>) super.getPactContract();
+ public GenericDataSinkBase<?> getOperator() {
+ return (GenericDataSinkBase<?>) super.getOperator();
}
@Override
@@ -97,7 +97,7 @@ public class DataSinkNode extends OptimizerNode {
}
@Override
- public List<PactConnection> getIncomingConnections() {
+ public List<DagConnection> getIncomingConnections() {
return Collections.singletonList(this.input);
}
@@ -107,19 +107,19 @@ public class DataSinkNode extends OptimizerNode {
* @return An empty list.
*/
@Override
- public List<PactConnection> getOutgoingConnections() {
+ public List<DagConnection> getOutgoingConnections() {
return Collections.emptyList();
}
@Override
public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode) {
- Operator<?> children = getPactContract().getInput();
+ Operator<?> children = getOperator().getInput();
final OptimizerNode pred;
- final PactConnection conn;
+ final DagConnection conn;
pred = contractToNode.get(children);
- conn = new PactConnection(pred, this, defaultExchangeMode);
+ conn = new DagConnection(pred, this, defaultExchangeMode);
// create the connection and add it
this.input = conn;
@@ -141,8 +141,8 @@ public class DataSinkNode extends OptimizerNode {
final InterestingProperties iProps = new InterestingProperties();
{
- final Ordering partitioning = getPactContract().getPartitionOrdering();
- final DataDistribution dataDist = getPactContract().getDataDistribution();
+ final Ordering partitioning = getOperator().getPartitionOrdering();
+ final DataDistribution dataDist = getOperator().getDataDistribution();
final RequestedGlobalProperties partitioningProps = new RequestedGlobalProperties();
if (partitioning != null) {
if(dataDist != null) {
@@ -156,7 +156,7 @@ public class DataSinkNode extends OptimizerNode {
}
{
- final Ordering localOrder = getPactContract().getLocalOrder();
+ final Ordering localOrder = getOperator().getLocalOrder();
final RequestedLocalProperties orderProps = new RequestedLocalProperties();
if (localOrder != null) {
orderProps.setOrdering(localOrder);
@@ -184,7 +184,7 @@ public class DataSinkNode extends OptimizerNode {
}
@Override
- protected List<UnclosedBranchDescriptor> getBranchesForParent(PactConnection parent) {
+ protected List<UnclosedBranchDescriptor> getBranchesForParent(DagConnection parent) {
// return our own stack of open branches, because nothing is added
return this.openBranches;
}
@@ -204,8 +204,8 @@ public class DataSinkNode extends OptimizerNode {
List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
List<PlanNode> outputPlans = new ArrayList<PlanNode>();
- final int dop = getDegreeOfParallelism();
- final int inDop = getPredecessorNode().getDegreeOfParallelism();
+ final int dop = getParallelism();
+ final int inDop = getPredecessorNode().getParallelism();
final ExecutionMode executionMode = this.input.getDataExchangeMode();
final boolean dopChange = dop != inDop;
@@ -224,7 +224,7 @@ public class DataSinkNode extends OptimizerNode {
// no need to check whether the created properties meet what we need in case
// of ordering or global ordering, because the only interesting properties we have
// are what we require
- outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getPactContract().getName()+")" ,c));
+ outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getOperator().getName()+")" ,c));
}
}
}