You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:15 UTC
[36/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java
deleted file mode 100644
index 0d1dfc9..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-/**
- * Enumeration to indicate the mode of temporarily materializing the data that flows across a connection.
- * Introducing such an artificial dam is sometimes necessary to avoid that a certain data flows deadlock
- * themselves, or as a cache to replay an intermediate result.
- */
-public enum TempMode {
-
- NONE(false, false),
- PIPELINE_BREAKER(false, true),
- CACHED(true, false),
- CACHING_PIPELINE_BREAKER(true, true);
-
- // --------------------------------------------------------------------------------------------
-
- private final boolean cached;
-
- private final boolean breaksPipeline;
-
-
- private TempMode(boolean cached, boolean breaksPipeline) {
- this.cached = cached;
- this.breaksPipeline = breaksPipeline;
- }
-
- public boolean isCached() {
- return cached;
- }
-
- public boolean breaksPipeline() {
- return breaksPipeline;
- }
-
- public TempMode makePipelineBreaker() {
- if (this == NONE) {
- return PIPELINE_BREAKER;
- } else if (this == CACHED) {
- return CACHING_PIPELINE_BREAKER;
- } else {
- return this;
- }
- }
-
- public TempMode makeCached() {
- if (this == NONE) {
- return CACHED;
- } else if (this == PIPELINE_BREAKER) {
- return CACHING_PIPELINE_BREAKER;
- } else {
- return this;
- }
- }
-
-
- public TempMode makeNonCached() {
- if (this == CACHED) {
- return NONE;
- } else if (this == CACHING_PIPELINE_BREAKER) {
- return PIPELINE_BREAKER;
- } else {
- return this;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
deleted file mode 100644
index 39da165..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
+++ /dev/null
@@ -1,747 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.DualInputOperator;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual.GlobalPropertiesPair;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual.LocalPropertiesPair;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.NamedChannel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.DamBehavior;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Visitor;
-
-import com.google.common.collect.Sets;
-
-/**
- * A node in the optimizer plan that represents a PACT with a two different inputs, such as MATCH or CROSS.
- * The two inputs are not substitutable in their sides.
- */
-public abstract class TwoInputNode extends OptimizerNode {
-
- protected final FieldList keys1; // The set of key fields for the first input
-
- protected final FieldList keys2; // The set of key fields for the second input
-
- protected DagConnection input1; // The first input edge
-
- protected DagConnection input2; // The second input edge
-
- private List<OperatorDescriptorDual> cachedDescriptors;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a new node with a single input for the optimizer plan.
- *
- * @param pactContract
- * The PACT that the node represents.
- */
- public TwoInputNode(DualInputOperator<?, ?, ?, ?> pactContract) {
- super(pactContract);
-
- int[] k1 = pactContract.getKeyColumns(0);
- int[] k2 = pactContract.getKeyColumns(1);
-
- this.keys1 = k1 == null || k1.length == 0 ? null : new FieldList(k1);
- this.keys2 = k2 == null || k2.length == 0 ? null : new FieldList(k2);
-
- if (this.keys1 != null) {
- if (this.keys2 != null) {
- if (this.keys1.size() != this.keys2.size()) {
- throw new CompilerException("Unequal number of key fields on the two inputs.");
- }
- } else {
- throw new CompilerException("Keys are set on first input, but not on second.");
- }
- } else if (this.keys2 != null) {
- throw new CompilerException("Keys are set on second input, but not on first.");
- }
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public DualInputOperator<?, ?, ?, ?> getOperator() {
- return (DualInputOperator<?, ?, ?, ?>) super.getOperator();
- }
-
- /**
- * Gets the <tt>PactConnection</tt> through which this node receives its <i>first</i> input.
- *
- * @return The first input connection.
- */
- public DagConnection getFirstIncomingConnection() {
- return this.input1;
- }
-
- /**
- * Gets the <tt>PactConnection</tt> through which this node receives its <i>second</i> input.
- *
- * @return The second input connection.
- */
- public DagConnection getSecondIncomingConnection() {
- return this.input2;
- }
-
- public OptimizerNode getFirstPredecessorNode() {
- if(this.input1 != null) {
- return this.input1.getSource();
- } else {
- return null;
- }
- }
-
- public OptimizerNode getSecondPredecessorNode() {
- if(this.input2 != null) {
- return this.input2.getSource();
- } else {
- return null;
- }
- }
-
- @Override
- public List<DagConnection> getIncomingConnections() {
- ArrayList<DagConnection> inputs = new ArrayList<DagConnection>(2);
- inputs.add(input1);
- inputs.add(input2);
- return inputs;
- }
-
-
- @Override
- public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExecutionMode) {
- // see if there is a hint that dictates which shipping strategy to use for BOTH inputs
- final Configuration conf = getOperator().getParameters();
- ShipStrategyType preSet1 = null;
- ShipStrategyType preSet2 = null;
-
- String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
- if (shipStrategy != null) {
- if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
- preSet1 = preSet2 = ShipStrategyType.FORWARD;
- } else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
- preSet1 = preSet2 = ShipStrategyType.BROADCAST;
- } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
- preSet1 = preSet2 = ShipStrategyType.PARTITION_HASH;
- } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
- preSet1 = preSet2 = ShipStrategyType.PARTITION_RANGE;
- } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
- preSet1 = preSet2 = ShipStrategyType.PARTITION_RANDOM;
- } else {
- throw new CompilerException("Unknown hint for shipping strategy: " + shipStrategy);
- }
- }
-
- // see if there is a hint that dictates which shipping strategy to use for the FIRST input
- shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, null);
- if (shipStrategy != null) {
- if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
- preSet1 = ShipStrategyType.FORWARD;
- } else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
- preSet1 = ShipStrategyType.BROADCAST;
- } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
- preSet1 = ShipStrategyType.PARTITION_HASH;
- } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
- preSet1 = ShipStrategyType.PARTITION_RANGE;
- } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
- preSet1 = ShipStrategyType.PARTITION_RANDOM;
- } else {
- throw new CompilerException("Unknown hint for shipping strategy of input one: " + shipStrategy);
- }
- }
-
- // see if there is a hint that dictates which shipping strategy to use for the SECOND input
- shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, null);
- if (shipStrategy != null) {
- if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
- preSet2 = ShipStrategyType.FORWARD;
- } else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
- preSet2 = ShipStrategyType.BROADCAST;
- } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
- preSet2 = ShipStrategyType.PARTITION_HASH;
- } else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
- preSet2 = ShipStrategyType.PARTITION_RANGE;
- } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
- preSet2 = ShipStrategyType.PARTITION_RANDOM;
- } else {
- throw new CompilerException("Unknown hint for shipping strategy of input two: " + shipStrategy);
- }
- }
-
- // get the predecessors
- DualInputOperator<?, ?, ?, ?> contr = getOperator();
-
- Operator<?> leftPred = contr.getFirstInput();
- Operator<?> rightPred = contr.getSecondInput();
-
- OptimizerNode pred1;
- DagConnection conn1;
- if (leftPred == null) {
- throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input set for first input.");
- } else {
- pred1 = contractToNode.get(leftPred);
- conn1 = new DagConnection(pred1, this, defaultExecutionMode);
- if (preSet1 != null) {
- conn1.setShipStrategy(preSet1);
- }
- }
-
- // create the connection and add it
- this.input1 = conn1;
- pred1.addOutgoingConnection(conn1);
-
- OptimizerNode pred2;
- DagConnection conn2;
- if (rightPred == null) {
- throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input set for second input.");
- } else {
- pred2 = contractToNode.get(rightPred);
- conn2 = new DagConnection(pred2, this, defaultExecutionMode);
- if (preSet2 != null) {
- conn2.setShipStrategy(preSet2);
- }
- }
-
- // create the connection and add it
- this.input2 = conn2;
- pred2.addOutgoingConnection(conn2);
- }
-
- protected abstract List<OperatorDescriptorDual> getPossibleProperties();
-
- private List<OperatorDescriptorDual> getProperties() {
- if (this.cachedDescriptors == null) {
- this.cachedDescriptors = getPossibleProperties();
- }
- return this.cachedDescriptors;
- }
-
- @Override
- public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
- // get what we inherit and what is preserved by our user code
- final InterestingProperties props1 = getInterestingProperties().filterByCodeAnnotations(this, 0);
- final InterestingProperties props2 = getInterestingProperties().filterByCodeAnnotations(this, 1);
-
- // add all properties relevant to this node
- for (OperatorDescriptorDual dpd : getProperties()) {
- for (GlobalPropertiesPair gp : dpd.getPossibleGlobalProperties()) {
- // input 1
- props1.addGlobalProperties(gp.getProperties1());
-
- // input 2
- props2.addGlobalProperties(gp.getProperties2());
- }
- for (LocalPropertiesPair lp : dpd.getPossibleLocalProperties()) {
- // input 1
- props1.addLocalProperties(lp.getProperties1());
-
- // input 2
- props2.addLocalProperties(lp.getProperties2());
- }
- }
- this.input1.setInterestingProperties(props1);
- this.input2.setInterestingProperties(props2);
-
- for (DagConnection conn : getBroadcastConnections()) {
- conn.setInterestingProperties(new InterestingProperties());
- }
- }
-
- @Override
- public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
- // check if we have a cached version
- if (this.cachedPlans != null) {
- return this.cachedPlans;
- }
-
- boolean childrenSkippedDueToReplicatedInput = false;
-
- // step down to all producer nodes and calculate alternative plans
- final List<? extends PlanNode> subPlans1 = getFirstPredecessorNode().getAlternativePlans(estimator);
- final List<? extends PlanNode> subPlans2 = getSecondPredecessorNode().getAlternativePlans(estimator);
-
- // calculate alternative sub-plans for predecessor
- final Set<RequestedGlobalProperties> intGlobal1 = this.input1.getInterestingProperties().getGlobalProperties();
- final Set<RequestedGlobalProperties> intGlobal2 = this.input2.getInterestingProperties().getGlobalProperties();
-
- // calculate alternative sub-plans for broadcast inputs
- final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>();
- List<DagConnection> broadcastConnections = getBroadcastConnections();
- List<String> broadcastConnectionNames = getBroadcastConnectionNames();
-
- for (int i = 0; i < broadcastConnections.size(); i++ ) {
- DagConnection broadcastConnection = broadcastConnections.get(i);
- String broadcastConnectionName = broadcastConnectionNames.get(i);
- List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator);
-
- // wrap the plan candidates in named channels
- HashSet<NamedChannel> broadcastChannels = new HashSet<NamedChannel>(broadcastPlanCandidates.size());
- for (PlanNode plan: broadcastPlanCandidates) {
- final NamedChannel c = new NamedChannel(broadcastConnectionName, plan);
- DataExchangeMode exMode = DataExchangeMode.select(broadcastConnection.getDataExchangeMode(),
- ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline());
- c.setShipStrategy(ShipStrategyType.BROADCAST, exMode);
- broadcastChannels.add(c);
- }
- broadcastPlanChannels.add(broadcastChannels);
- }
-
- final GlobalPropertiesPair[] allGlobalPairs;
- final LocalPropertiesPair[] allLocalPairs;
- {
- Set<GlobalPropertiesPair> pairsGlob = new HashSet<GlobalPropertiesPair>();
- Set<LocalPropertiesPair> pairsLoc = new HashSet<LocalPropertiesPair>();
- for (OperatorDescriptorDual ods : getProperties()) {
- pairsGlob.addAll(ods.getPossibleGlobalProperties());
- pairsLoc.addAll(ods.getPossibleLocalProperties());
- }
- allGlobalPairs = pairsGlob.toArray(new GlobalPropertiesPair[pairsGlob.size()]);
- allLocalPairs = pairsLoc.toArray(new LocalPropertiesPair[pairsLoc.size()]);
- }
-
- final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();
-
- final ExecutionMode input1Mode = this.input1.getDataExchangeMode();
- final ExecutionMode input2Mode = this.input2.getDataExchangeMode();
-
- final int dop = getParallelism();
- final int inDop1 = getFirstPredecessorNode().getParallelism();
- final int inDop2 = getSecondPredecessorNode().getParallelism();
-
- final boolean dopChange1 = dop != inDop1;
- final boolean dopChange2 = dop != inDop2;
-
- final boolean input1breaksPipeline = this.input1.isBreakingPipeline();
- final boolean input2breaksPipeline = this.input2.isBreakingPipeline();
-
- // enumerate all pairwise combination of the children's plans together with
- // all possible operator strategy combination
-
- // create all candidates
- for (PlanNode child1 : subPlans1) {
-
- if (child1.getGlobalProperties().isFullyReplicated()) {
- // fully replicated input is always locally forwarded if DOP is not changed
- if (dopChange1) {
- // can not continue with this child
- childrenSkippedDueToReplicatedInput = true;
- continue;
- } else {
- this.input1.setShipStrategy(ShipStrategyType.FORWARD);
- }
- }
-
- for (PlanNode child2 : subPlans2) {
-
- if (child2.getGlobalProperties().isFullyReplicated()) {
- // fully replicated input is always locally forwarded if DOP is not changed
- if (dopChange2) {
- // can not continue with this child
- childrenSkippedDueToReplicatedInput = true;
- continue;
- } else {
- this.input2.setShipStrategy(ShipStrategyType.FORWARD);
- }
- }
-
- // check that the children go together. that is the case if they build upon the same
- // candidate at the joined branch plan.
- if (!areBranchCompatible(child1, child2)) {
- continue;
- }
-
- for (RequestedGlobalProperties igps1: intGlobal1) {
- // create a candidate channel for the first input. mark it cached, if the connection says so
- final Channel c1 = new Channel(child1, this.input1.getMaterializationMode());
- if (this.input1.getShipStrategy() == null) {
- // free to choose the ship strategy
- igps1.parameterizeChannel(c1, dopChange1, input1Mode, input1breaksPipeline);
-
- // if the DOP changed, make sure that we cancel out properties, unless the
- // ship strategy preserves/establishes them even under changing DOPs
- if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
- c1.getGlobalProperties().reset();
- }
- }
- else {
- // ship strategy fixed by compiler hint
- ShipStrategyType shipType = this.input1.getShipStrategy();
- DataExchangeMode exMode = DataExchangeMode.select(input1Mode, shipType, input1breaksPipeline);
- if (this.keys1 != null) {
- c1.setShipStrategy(shipType, this.keys1.toFieldList(), exMode);
- }
- else {
- c1.setShipStrategy(shipType, exMode);
- }
-
- if (dopChange1) {
- c1.adjustGlobalPropertiesForFullParallelismChange();
- }
- }
-
- for (RequestedGlobalProperties igps2: intGlobal2) {
- // create a candidate channel for the first input. mark it cached, if the connection says so
- final Channel c2 = new Channel(child2, this.input2.getMaterializationMode());
- if (this.input2.getShipStrategy() == null) {
- // free to choose the ship strategy
- igps2.parameterizeChannel(c2, dopChange2, input2Mode, input2breaksPipeline);
-
- // if the DOP changed, make sure that we cancel out properties, unless the
- // ship strategy preserves/establishes them even under changing DOPs
- if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
- c2.getGlobalProperties().reset();
- }
- } else {
- // ship strategy fixed by compiler hint
- ShipStrategyType shipType = this.input2.getShipStrategy();
- DataExchangeMode exMode = DataExchangeMode.select(input2Mode, shipType, input2breaksPipeline);
- if (this.keys2 != null) {
- c2.setShipStrategy(shipType, this.keys2.toFieldList(), exMode);
- } else {
- c2.setShipStrategy(shipType, exMode);
- }
-
- if (dopChange2) {
- c2.adjustGlobalPropertiesForFullParallelismChange();
- }
- }
-
- /* ********************************************************************
- * NOTE: Depending on how we proceed with different partitioning,
- * we might at some point need a compatibility check between
- * the pairs of global properties.
- * *******************************************************************/
-
- outer:
- for (GlobalPropertiesPair gpp : allGlobalPairs) {
- if (gpp.getProperties1().isMetBy(c1.getGlobalProperties()) &&
- gpp.getProperties2().isMetBy(c2.getGlobalProperties()) )
- {
- for (OperatorDescriptorDual desc : getProperties()) {
- if (desc.areCompatible(gpp.getProperties1(), gpp.getProperties2(),
- c1.getGlobalProperties(), c2.getGlobalProperties()))
- {
- Channel c1Clone = c1.clone();
- c1Clone.setRequiredGlobalProps(gpp.getProperties1());
- c2.setRequiredGlobalProps(gpp.getProperties2());
-
- // we form a valid combination, so create the local candidates
- // for this
- addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2,
- outputPlans, allLocalPairs, estimator);
- break outer;
- }
- }
- }
- }
-
- // break the loop over input2's possible global properties, if the property
- // is fixed via a hint. All the properties are overridden by the hint anyways,
- // so we can stop after the first
- if (this.input2.getShipStrategy() != null) {
- break;
- }
- }
-
- // break the loop over input1's possible global properties, if the property
- // is fixed via a hint. All the properties are overridden by the hint anyways,
- // so we can stop after the first
- if (this.input1.getShipStrategy() != null) {
- break;
- }
- }
- }
- }
-
- if(outputPlans.isEmpty()) {
- if(childrenSkippedDueToReplicatedInput) {
- throw new CompilerException("No plan meeting the requirements could be created @ " + this
- + ". Most likely reason: Invalid use of replicated input.");
- } else {
- throw new CompilerException("No plan meeting the requirements could be created @ " + this
- + ". Most likely reason: Too restrictive plan hints.");
- }
- }
-
- // cost and prune the plans
- for (PlanNode node : outputPlans) {
- estimator.costOperator(node);
- }
- prunePlanAlternatives(outputPlans);
- outputPlans.trimToSize();
-
- this.cachedPlans = outputPlans;
- return outputPlans;
- }
-
- protected void addLocalCandidates(Channel template1, Channel template2, List<Set<? extends NamedChannel>> broadcastPlanChannels,
- RequestedGlobalProperties rgps1, RequestedGlobalProperties rgps2,
- List<PlanNode> target, LocalPropertiesPair[] validLocalCombinations, CostEstimator estimator)
- {
- for (RequestedLocalProperties ilp1 : this.input1.getInterestingProperties().getLocalProperties()) {
- final Channel in1 = template1.clone();
- ilp1.parameterizeChannel(in1);
-
- for (RequestedLocalProperties ilp2 : this.input2.getInterestingProperties().getLocalProperties()) {
- final Channel in2 = template2.clone();
- ilp2.parameterizeChannel(in2);
-
- for (OperatorDescriptorDual dps: getProperties()) {
- for (LocalPropertiesPair lpp : dps.getPossibleLocalProperties()) {
- if (lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
- lpp.getProperties2().isMetBy(in2.getLocalProperties()) )
- {
- // valid combination
- // for non trivial local properties, we need to check that they are co compatible
- // (such as when some sort order is requested, that both are the same sort order
- if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(),
- in1.getLocalProperties(), in2.getLocalProperties()))
- {
- // copy, because setting required properties and instantiation may
- // change the channels and should not affect prior candidates
- Channel in1Copy = in1.clone();
- in1Copy.setRequiredLocalProps(lpp.getProperties1());
-
- Channel in2Copy = in2.clone();
- in2Copy.setRequiredLocalProps(lpp.getProperties2());
-
- // all right, co compatible
- instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
- break;
- }
- // else cannot use this pair, fall through the loop and try the next one
- }
- }
- }
- }
- }
- }
-
- protected void instantiate(OperatorDescriptorDual operator, Channel in1, Channel in2,
- List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
- RequestedGlobalProperties globPropsReq1, RequestedGlobalProperties globPropsReq2,
- RequestedLocalProperties locPropsReq1, RequestedLocalProperties locPropsReq2)
- {
- final PlanNode inputSource1 = in1.getSource();
- final PlanNode inputSource2 = in2.getSource();
-
- for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
-
- boolean validCombination = true;
-
- // check whether the broadcast inputs use the same plan candidate at the branching point
- for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
- NamedChannel nc = broadcastChannelsCombination.get(i);
- PlanNode bcSource = nc.getSource();
-
- if (!(areBranchCompatible(bcSource, inputSource1) || areBranchCompatible(bcSource, inputSource2))) {
- validCombination = false;
- break;
- }
-
- // check branch compatibility against all other broadcast variables
- for (int k = 0; k < i; k++) {
- PlanNode otherBcSource = broadcastChannelsCombination.get(k).getSource();
-
- if (!areBranchCompatible(bcSource, otherBcSource)) {
- validCombination = false;
- break;
- }
- }
- }
-
- if (!validCombination) {
- continue;
- }
-
- placePipelineBreakersIfNecessary(operator.getStrategy(), in1, in2);
-
- DualInputPlanNode node = operator.instantiate(in1, in2, this);
- node.setBroadcastInputs(broadcastChannelsCombination);
-
- SemanticProperties props = this.getSemanticProperties();
- GlobalProperties gp1 = in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0);
- GlobalProperties gp2 = in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1);
- GlobalProperties combined = operator.computeGlobalProperties(gp1, gp2);
-
- LocalProperties lp1 = in1.getLocalProperties().clone().filterBySemanticProperties(props, 0);
- LocalProperties lp2 = in2.getLocalProperties().clone().filterBySemanticProperties(props, 1);
- LocalProperties locals = operator.computeLocalProperties(lp1, lp2);
-
- node.initProperties(combined, locals);
- node.updatePropertiesWithUniqueSets(getUniqueFields());
- target.add(node);
- }
- }
-
- protected void placePipelineBreakersIfNecessary(DriverStrategy strategy, Channel in1, Channel in2) {
- // before we instantiate, check for deadlocks by tracing back to the open branches and checking
- // whether either no input, or all of them have a dam
- if (this.hereJoinedBranches != null && this.hereJoinedBranches.size() > 0) {
- boolean someDamOnLeftPaths = false;
- boolean damOnAllLeftPaths = true;
- boolean someDamOnRightPaths = false;
- boolean damOnAllRightPaths = true;
-
- if (strategy.firstDam() == DamBehavior.FULL_DAM || in1.getLocalStrategy().dams() || in1.getTempMode().breaksPipeline()) {
- someDamOnLeftPaths = true;
- } else {
- for (OptimizerNode brancher : this.hereJoinedBranches) {
- PlanNode candAtBrancher = in1.getSource().getCandidateAtBranchPoint(brancher);
-
- // not all candidates are found, because this list includes joined branched from both regular inputs and broadcast vars
- if (candAtBrancher == null) {
- continue;
- }
-
- SourceAndDamReport res = in1.getSource().hasDamOnPathDownTo(candAtBrancher);
- if (res == NOT_FOUND) {
- throw new CompilerException("Bug: Tracing dams for deadlock detection is broken.");
- } else if (res == FOUND_SOURCE) {
- damOnAllLeftPaths = false;
- } else if (res == FOUND_SOURCE_AND_DAM) {
- someDamOnLeftPaths = true;
- } else {
- throw new CompilerException();
- }
- }
- }
-
- if (strategy.secondDam() == DamBehavior.FULL_DAM || in2.getLocalStrategy().dams() || in2.getTempMode().breaksPipeline()) {
- someDamOnRightPaths = true;
- } else {
- for (OptimizerNode brancher : this.hereJoinedBranches) {
- PlanNode candAtBrancher = in2.getSource().getCandidateAtBranchPoint(brancher);
-
- // not all candidates are found, because this list includes joined branched from both regular inputs and broadcast vars
- if (candAtBrancher == null) {
- continue;
- }
-
- SourceAndDamReport res = in2.getSource().hasDamOnPathDownTo(candAtBrancher);
- if (res == NOT_FOUND) {
- throw new CompilerException("Bug: Tracing dams for deadlock detection is broken.");
- } else if (res == FOUND_SOURCE) {
- damOnAllRightPaths = false;
- } else if (res == FOUND_SOURCE_AND_DAM) {
- someDamOnRightPaths = true;
- } else {
- throw new CompilerException();
- }
- }
- }
-
- // okay combinations are both all dam or both no dam
- if ( (damOnAllLeftPaths & damOnAllRightPaths) | (!someDamOnLeftPaths & !someDamOnRightPaths) ) {
- // good, either both materialize already on the way, or both fully pipeline
- } else {
- if (someDamOnLeftPaths & !damOnAllRightPaths) {
- // right needs a pipeline breaker
- in2.setTempMode(in2.getTempMode().makePipelineBreaker());
- }
-
- if (someDamOnRightPaths & !damOnAllLeftPaths) {
- // right needs a pipeline breaker
- in1.setTempMode(in1.getTempMode().makePipelineBreaker());
- }
- }
- }
- }
-
- @Override
- public void computeUnclosedBranchStack() {
- if (this.openBranches != null) {
- return;
- }
-
- // handle the data flow branching for the regular inputs
- addClosedBranches(getFirstPredecessorNode().closedBranchingNodes);
- addClosedBranches(getSecondPredecessorNode().closedBranchingNodes);
-
- List<UnclosedBranchDescriptor> result1 = getFirstPredecessorNode().getBranchesForParent(getFirstIncomingConnection());
- List<UnclosedBranchDescriptor> result2 = getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection());
-
- ArrayList<UnclosedBranchDescriptor> inputsMerged = new ArrayList<UnclosedBranchDescriptor>();
- mergeLists(result1, result2, inputsMerged, true);
-
- // handle the data flow branching for the broadcast inputs
- List<UnclosedBranchDescriptor> result = computeUnclosedBranchStackForBroadcastInputs(inputsMerged);
-
- this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
- }
-
- @Override
- public SemanticProperties getSemanticProperties() {
- return getOperator().getSemanticProperties();
- }
-
- // --------------------------------------------------------------------------------------------
- // Miscellaneous
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void accept(Visitor<OptimizerNode> visitor) {
- if (visitor.preVisit(this)) {
- if (this.input1 == null || this.input2 == null) {
- throw new CompilerException();
- }
-
- getFirstPredecessorNode().accept(visitor);
- getSecondPredecessorNode().accept(visitor);
-
- for (DagConnection connection : getBroadcastConnections()) {
- connection.getSource().accept(visitor);
- }
-
- visitor.postVisit(this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
deleted file mode 100644
index 45ecdac..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-
-public class UnaryOperatorNode extends SingleInputNode {
-
- private final List<OperatorDescriptorSingle> operator;
-
- private final String name;
-
-
-
- public UnaryOperatorNode(String name, FieldSet keys, OperatorDescriptorSingle ... operators) {
- this(name, keys, Arrays.asList(operators));
- }
-
- public UnaryOperatorNode(String name, FieldSet keys, List<OperatorDescriptorSingle> operators) {
- super(keys);
-
- this.operator = operators;
- this.name = name;
- }
-
- @Override
- protected List<OperatorDescriptorSingle> getPossibleProperties() {
- return this.operator;
- }
-
- @Override
- public String getName() {
- return this.name;
- }
-
- @Override
- public SemanticProperties getSemanticProperties() {
- return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
- }
-
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- // we have no estimates by default
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
deleted file mode 100644
index e85f289..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
+++ /dev/null
@@ -1,589 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.SolutionSetDeltaOperator;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.NamedChannel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
-import org.apache.flink.optimizer.util.NoOpBinaryUdfOp;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.Nothing;
-import org.apache.flink.util.Visitor;
-
-/**
- * A node in the optimizer's program representation for a workset iteration.
- */
-public class WorksetIterationNode extends TwoInputNode implements IterationNode {
-
- private static final int DEFAULT_COST_WEIGHT = 20;
-
-
- private final FieldList solutionSetKeyFields;
-
- private final GlobalProperties partitionedProperties;
-
- private final List<OperatorDescriptorDual> dataProperties;
-
- private SolutionSetNode solutionSetNode;
-
- private WorksetNode worksetNode;
-
- private OptimizerNode solutionSetDelta;
-
- private OptimizerNode nextWorkset;
-
- private DagConnection solutionSetDeltaRootConnection;
-
- private DagConnection nextWorksetRootConnection;
-
- private SingleRootJoiner singleRoot;
-
- private boolean solutionDeltaImmediatelyAfterSolutionJoin;
-
- private final int costWeight;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Creates a new node with a single input for the optimizer plan.
- *
- * @param iteration The iteration operator that the node represents.
- */
- public WorksetIterationNode(DeltaIterationBase<?, ?> iteration) {
- super(iteration);
-
- final int[] ssKeys = iteration.getSolutionSetKeyFields();
- if (ssKeys == null || ssKeys.length == 0) {
- throw new CompilerException("Invalid WorksetIteration: No key fields defined for the solution set.");
- }
- this.solutionSetKeyFields = new FieldList(ssKeys);
- this.partitionedProperties = new GlobalProperties();
- this.partitionedProperties.setHashPartitioned(this.solutionSetKeyFields);
-
- int weight = iteration.getMaximumNumberOfIterations() > 0 ?
- iteration.getMaximumNumberOfIterations() : DEFAULT_COST_WEIGHT;
-
- if (weight > OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT) {
- weight = OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT;
- }
- this.costWeight = weight;
-
- this.dataProperties = Collections.<OperatorDescriptorDual>singletonList(new WorksetOpDescriptor(this.solutionSetKeyFields));
- }
-
- // --------------------------------------------------------------------------------------------
-
- public DeltaIterationBase<?, ?> getIterationContract() {
- return (DeltaIterationBase<?, ?>) getOperator();
- }
-
- public SolutionSetNode getSolutionSetNode() {
- return this.solutionSetNode;
- }
-
- public WorksetNode getWorksetNode() {
- return this.worksetNode;
- }
-
- public OptimizerNode getNextWorkset() {
- return this.nextWorkset;
- }
-
- public OptimizerNode getSolutionSetDelta() {
- return this.solutionSetDelta;
- }
-
- public void setPartialSolution(SolutionSetNode solutionSetNode, WorksetNode worksetNode) {
- if (this.solutionSetNode != null || this.worksetNode != null) {
- throw new IllegalStateException("Error: Initializing WorksetIterationNode multiple times.");
- }
- this.solutionSetNode = solutionSetNode;
- this.worksetNode = worksetNode;
- }
-
- public void setNextPartialSolution(OptimizerNode solutionSetDelta, OptimizerNode nextWorkset,
- ExecutionMode executionMode) {
-
- // check whether the next partial solution is itself the join with
- // the partial solution (so we can potentially do direct updates)
- if (solutionSetDelta instanceof TwoInputNode) {
- TwoInputNode solutionDeltaTwoInput = (TwoInputNode) solutionSetDelta;
- if (solutionDeltaTwoInput.getFirstPredecessorNode() == this.solutionSetNode ||
- solutionDeltaTwoInput.getSecondPredecessorNode() == this.solutionSetNode)
- {
- this.solutionDeltaImmediatelyAfterSolutionJoin = true;
- }
- }
-
- // there needs to be at least one node in the workset path, so
- // if the next workset is equal to the workset, we need to inject a no-op node
- if (nextWorkset == worksetNode || nextWorkset instanceof BinaryUnionNode) {
- NoOpNode noop = new NoOpNode();
- noop.setDegreeOfParallelism(getParallelism());
-
- DagConnection noOpConn = new DagConnection(nextWorkset, noop, executionMode);
- noop.setIncomingConnection(noOpConn);
- nextWorkset.addOutgoingConnection(noOpConn);
-
- nextWorkset = noop;
- }
-
- // attach an extra node to the solution set delta for the cases where we need to repartition
- UnaryOperatorNode solutionSetDeltaUpdateAux = new UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(),
- new SolutionSetDeltaOperator(getSolutionSetKeyFields()));
- solutionSetDeltaUpdateAux.setDegreeOfParallelism(getParallelism());
-
- DagConnection conn = new DagConnection(solutionSetDelta, solutionSetDeltaUpdateAux, executionMode);
- solutionSetDeltaUpdateAux.setIncomingConnection(conn);
- solutionSetDelta.addOutgoingConnection(conn);
-
- this.solutionSetDelta = solutionSetDeltaUpdateAux;
- this.nextWorkset = nextWorkset;
-
- this.singleRoot = new SingleRootJoiner();
- this.solutionSetDeltaRootConnection = new DagConnection(solutionSetDeltaUpdateAux,
- this.singleRoot, executionMode);
-
- this.nextWorksetRootConnection = new DagConnection(nextWorkset, this.singleRoot, executionMode);
- this.singleRoot.setInputs(this.solutionSetDeltaRootConnection, this.nextWorksetRootConnection);
-
- solutionSetDeltaUpdateAux.addOutgoingConnection(this.solutionSetDeltaRootConnection);
- nextWorkset.addOutgoingConnection(this.nextWorksetRootConnection);
- }
-
- public int getCostWeight() {
- return this.costWeight;
- }
-
- public TwoInputNode getSingleRootOfStepFunction() {
- return this.singleRoot;
- }
-
- public FieldList getSolutionSetKeyFields() {
- return this.solutionSetKeyFields;
- }
-
- public OptimizerNode getInitialSolutionSetPredecessorNode() {
- return getFirstPredecessorNode();
- }
-
- public OptimizerNode getInitialWorksetPredecessorNode() {
- return getSecondPredecessorNode();
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public String getName() {
- return "Workset Iteration";
- }
-
- @Override
- public SemanticProperties getSemanticProperties() {
- return new EmptySemanticProperties();
- }
-
- protected void readStubAnnotations() {}
-
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- this.estimatedOutputSize = getFirstPredecessorNode().getEstimatedOutputSize();
- this.estimatedNumRecords = getFirstPredecessorNode().getEstimatedNumRecords();
- }
-
- // --------------------------------------------------------------------------------------------
- // Properties and Optimization
- // --------------------------------------------------------------------------------------------
-
- @Override
- protected List<OperatorDescriptorDual> getPossibleProperties() {
- return this.dataProperties;
- }
-
- @Override
- public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
- // our own solution (the solution set) is always partitioned and this cannot be adjusted
- // depending on what the successor to the workset iteration requests. for that reason,
- // we ignore incoming interesting properties.
-
- // in addition, we need to make 2 interesting property passes, because the root of the step function
- // that computes the next workset needs the interesting properties as generated by the
- // workset source of the step function. the second pass concerns only the workset path.
- // as initial interesting properties, we have the trivial ones for the step function,
- // and partitioned on the solution set key for the solution set delta
-
- RequestedGlobalProperties partitionedProperties = new RequestedGlobalProperties();
- partitionedProperties.setHashPartitioned(this.solutionSetKeyFields);
- InterestingProperties partitionedIP = new InterestingProperties();
- partitionedIP.addGlobalProperties(partitionedProperties);
- partitionedIP.addLocalProperties(new RequestedLocalProperties());
-
- this.nextWorksetRootConnection.setInterestingProperties(new InterestingProperties());
- this.solutionSetDeltaRootConnection.setInterestingProperties(partitionedIP.clone());
-
- InterestingPropertyVisitor ipv = new InterestingPropertyVisitor(estimator);
- this.nextWorkset.accept(ipv);
- this.solutionSetDelta.accept(ipv);
-
- // take the interesting properties of the partial solution and add them to the root interesting properties
- InterestingProperties worksetIntProps = this.worksetNode.getInterestingProperties();
- InterestingProperties intProps = new InterestingProperties();
- intProps.getGlobalProperties().addAll(worksetIntProps.getGlobalProperties());
- intProps.getLocalProperties().addAll(worksetIntProps.getLocalProperties());
-
- // clear all interesting properties to prepare the second traversal
- this.nextWorksetRootConnection.clearInterestingProperties();
- this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE);
-
- // 2nd pass
- this.nextWorksetRootConnection.setInterestingProperties(intProps);
- this.nextWorkset.accept(ipv);
-
- // now add the interesting properties of the workset to the workset input
- final InterestingProperties inProps = this.worksetNode.getInterestingProperties().clone();
- inProps.addGlobalProperties(new RequestedGlobalProperties());
- inProps.addLocalProperties(new RequestedLocalProperties());
- this.input2.setInterestingProperties(inProps);
-
- // the partial solution must be hash partitioned, so it has only that as interesting properties
- this.input1.setInterestingProperties(partitionedIP);
- }
-
- @Override
- public void clearInterestingProperties() {
- super.clearInterestingProperties();
-
- this.nextWorksetRootConnection.clearInterestingProperties();
- this.solutionSetDeltaRootConnection.clearInterestingProperties();
-
- this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE);
- this.solutionSetDelta.accept(InterestingPropertiesClearer.INSTANCE);
- }
-
- @Override
- protected void instantiate(OperatorDescriptorDual operator, Channel solutionSetIn, Channel worksetIn,
- List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
- RequestedGlobalProperties globPropsReqSolutionSet, RequestedGlobalProperties globPropsReqWorkset,
- RequestedLocalProperties locPropsReqSolutionSet, RequestedLocalProperties locPropsReqWorkset)
- {
- // check for pipeline breaking using hash join with build on the solution set side
- placePipelineBreakersIfNecessary(DriverStrategy.HYBRIDHASH_BUILD_FIRST, solutionSetIn, worksetIn);
-
- // NOTES ON THE ENUMERATION OF THE STEP FUNCTION PLANS:
- // Whenever we instantiate the iteration, we enumerate new candidates for the step function.
- // That way, we make sure we have an appropriate plan for each candidate for the initial partial solution,
- // we have a fitting candidate for the step function (often, work is pushed out of the step function).
- // Among the candidates of the step function, we keep only those that meet the requested properties of the
- // current candidate initial partial solution. That makes sure these properties exist at the beginning of
- // every iteration.
-
- // 1) Because we enumerate multiple times, we may need to clean the cached plans
- // before starting another enumeration
- this.nextWorkset.accept(PlanCacheCleaner.INSTANCE);
- this.solutionSetDelta.accept(PlanCacheCleaner.INSTANCE);
-
- // 2) Give the partial solution the properties of the current candidate for the initial partial solution
- // This concerns currently only the workset.
- this.worksetNode.setCandidateProperties(worksetIn.getGlobalProperties(), worksetIn.getLocalProperties(), worksetIn);
- this.solutionSetNode.setCandidateProperties(this.partitionedProperties, new LocalProperties(), solutionSetIn);
-
- final SolutionSetPlanNode sspn = this.solutionSetNode.getCurrentSolutionSetPlanNode();
- final WorksetPlanNode wspn = this.worksetNode.getCurrentWorksetPlanNode();
-
- // 3) Get the alternative plans
- List<PlanNode> solutionSetDeltaCandidates = this.solutionSetDelta.getAlternativePlans(estimator);
- List<PlanNode> worksetCandidates = this.nextWorkset.getAlternativePlans(estimator);
-
- // 4) Throw away all that are not compatible with the properties currently requested to the
- // initial partial solution
-
- // Make sure that the workset candidates fulfill the input requirements
- {
- List<PlanNode> newCandidates = new ArrayList<PlanNode>();
-
- for (Iterator<PlanNode> planDeleter = worksetCandidates.iterator(); planDeleter.hasNext(); ) {
- PlanNode candidate = planDeleter.next();
-
- GlobalProperties atEndGlobal = candidate.getGlobalProperties();
- LocalProperties atEndLocal = candidate.getLocalProperties();
-
- FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(wspn,
- atEndGlobal, atEndLocal);
-
- if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
- ; // depends only through broadcast variable on the workset solution
- }
- else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
- // attach a no-op node through which we create the properties of the original input
- Channel toNoOp = new Channel(candidate);
- globPropsReqWorkset.parameterizeChannel(toNoOp, false,
- nextWorksetRootConnection.getDataExchangeMode(), false);
- locPropsReqWorkset.parameterizeChannel(toNoOp);
-
- UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties",
- FieldList.EMPTY_LIST);
-
- rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
-
- SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(
- rebuildWorksetPropertiesNode, "Rebuild Workset Properties",
- toNoOp, DriverStrategy.UNARY_NO_OP);
- rebuildWorksetPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(),
- toNoOp.getLocalProperties());
- estimator.costOperator(rebuildWorksetPropertiesPlanNode);
-
- GlobalProperties atEndGlobalModified = rebuildWorksetPropertiesPlanNode.getGlobalProperties();
- LocalProperties atEndLocalModified = rebuildWorksetPropertiesPlanNode.getLocalProperties();
-
- if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) {
- FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet(
- wspn, atEndGlobalModified, atEndLocalModified);
- if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
- newCandidates.add(rebuildWorksetPropertiesPlanNode);
- }
- }
-
- // remove the original operator and add the modified candidate
- planDeleter.remove();
-
- }
- }
-
- worksetCandidates.addAll(newCandidates);
- }
-
- if (worksetCandidates.isEmpty()) {
- return;
- }
-
- // sanity check the solution set delta
- for (PlanNode solutionSetDeltaCandidate : solutionSetDeltaCandidates) {
- SingleInputPlanNode candidate = (SingleInputPlanNode) solutionSetDeltaCandidate;
- GlobalProperties gp = candidate.getGlobalProperties();
-
- if (gp.getPartitioning() != PartitioningProperty.HASH_PARTITIONED || gp.getPartitioningFields() == null ||
- !gp.getPartitioningFields().equals(this.solutionSetKeyFields)) {
- throw new CompilerException("Bug: The solution set delta is not partitioned.");
- }
- }
-
- // 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
-
- final GlobalProperties gp = new GlobalProperties();
- gp.setHashPartitioned(this.solutionSetKeyFields);
- gp.addUniqueFieldCombination(this.solutionSetKeyFields);
-
- LocalProperties lp = LocalProperties.EMPTY.addUniqueFields(this.solutionSetKeyFields);
-
- // take all combinations of solution set delta and workset plans
- for (PlanNode solutionSetCandidate : solutionSetDeltaCandidates) {
- for (PlanNode worksetCandidate : worksetCandidates) {
- // check whether they have the same operator at their latest branching point
- if (this.singleRoot.areBranchCompatible(solutionSetCandidate, worksetCandidate)) {
-
- SingleInputPlanNode siSolutionDeltaCandidate = (SingleInputPlanNode) solutionSetCandidate;
- boolean immediateDeltaUpdate;
-
- // check whether we need a dedicated solution set delta operator, or whether we can update on the fly
- if (siSolutionDeltaCandidate.getInput().getShipStrategy() == ShipStrategyType.FORWARD &&
- this.solutionDeltaImmediatelyAfterSolutionJoin)
- {
- // we do not need this extra node. we can make the predecessor the delta
- // sanity check the node and connection
- if (siSolutionDeltaCandidate.getDriverStrategy() != DriverStrategy.UNARY_NO_OP ||
- siSolutionDeltaCandidate.getInput().getLocalStrategy() != LocalStrategy.NONE)
- {
- throw new CompilerException("Invalid Solution set delta node.");
- }
-
- solutionSetCandidate = siSolutionDeltaCandidate.getInput().getSource();
- immediateDeltaUpdate = true;
- } else {
- // was not partitioned, we need to keep this node.
- // mark that we materialize the input
- siSolutionDeltaCandidate.getInput().setTempMode(TempMode.PIPELINE_BREAKER);
- immediateDeltaUpdate = false;
- }
-
- WorksetIterationPlanNode wsNode = new WorksetIterationPlanNode(this,
- "WorksetIteration ("+this.getOperator().getName()+")", solutionSetIn,
- worksetIn, sspn, wspn, worksetCandidate, solutionSetCandidate);
- wsNode.setImmediateSolutionSetUpdate(immediateDeltaUpdate);
- wsNode.initProperties(gp, lp);
- target.add(wsNode);
- }
- }
- }
- }
-
- @Override
- public void computeUnclosedBranchStack() {
- if (this.openBranches != null) {
- return;
- }
-
- // IMPORTANT: First compute closed branches from the two inputs
- // we need to do this because the runtime iteration head effectively joins
- addClosedBranches(getFirstPredecessorNode().closedBranchingNodes);
- addClosedBranches(getSecondPredecessorNode().closedBranchingNodes);
-
- List<UnclosedBranchDescriptor> result1 = getFirstPredecessorNode().getBranchesForParent(getFirstIncomingConnection());
- List<UnclosedBranchDescriptor> result2 = getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection());
-
- ArrayList<UnclosedBranchDescriptor> inputsMerged1 = new ArrayList<UnclosedBranchDescriptor>();
- mergeLists(result1, result2, inputsMerged1, true); // this method also sets which branches are joined here (in the head)
-
- addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes);
-
- ArrayList<UnclosedBranchDescriptor> inputsMerged2 = new ArrayList<UnclosedBranchDescriptor>();
- List<UnclosedBranchDescriptor> result3 = getSingleRootOfStepFunction().openBranches;
- mergeLists(inputsMerged1, result3, inputsMerged2, true);
-
- // handle the data flow branching for the broadcast inputs
- List<UnclosedBranchDescriptor> result = computeUnclosedBranchStackForBroadcastInputs(inputsMerged2);
-
- this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
- }
-
- // --------------------------------------------------------------------------------------------
- // Iteration Specific Traversals
- // --------------------------------------------------------------------------------------------
-
- public void acceptForStepFunction(Visitor<OptimizerNode> visitor) {
- this.singleRoot.accept(visitor);
- }
-
- // --------------------------------------------------------------------------------------------
- // Utility Classes
- // --------------------------------------------------------------------------------------------
-
- private static final class WorksetOpDescriptor extends OperatorDescriptorDual {
-
- private WorksetOpDescriptor(FieldList solutionSetKeys) {
- super(solutionSetKeys, null);
- }
-
- @Override
- public DriverStrategy getStrategy() {
- return DriverStrategy.NONE;
- }
-
- @Override
- protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
- RequestedGlobalProperties partitionedGp = new RequestedGlobalProperties();
- partitionedGp.setHashPartitioned(this.keys1);
- return Collections.singletonList(new GlobalPropertiesPair(partitionedGp, new RequestedGlobalProperties()));
- }
-
- @Override
- protected List<LocalPropertiesPair> createPossibleLocalProperties() {
- // all properties are possible
- return Collections.singletonList(new LocalPropertiesPair(
- new RequestedLocalProperties(), new RequestedLocalProperties()));
- }
-
- @Override
- public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
- GlobalProperties produced1, GlobalProperties produced2) {
- return true;
- }
-
- @Override
- public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
- LocalProperties produced1, LocalProperties produced2) {
- return true;
- }
-
- @Override
- public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
- throw new UnsupportedOperationException();
- }
- }
-
- public static class SingleRootJoiner extends TwoInputNode {
-
- SingleRootJoiner() {
- super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo()));
-
- setDegreeOfParallelism(1);
- }
-
- public void setInputs(DagConnection input1, DagConnection input2) {
- this.input1 = input1;
- this.input2 = input2;
- }
-
- @Override
- public String getName() {
- return "Internal Utility Node";
- }
-
- @Override
- protected List<OperatorDescriptorDual> getPossibleProperties() {
- return Collections.emptyList();
- }
-
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- // no estimates are needed here
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
deleted file mode 100644
index 3b05aba..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-
-/**
- * The optimizer's internal representation of the partial solution that is input to a bulk iteration.
- */
-public class WorksetNode extends AbstractPartialSolutionNode {
-
- private final WorksetIterationNode iterationNode;
-
-
- public WorksetNode(WorksetPlaceHolder<?> psph, WorksetIterationNode iterationNode) {
- super(psph);
- this.iterationNode = iterationNode;
- }
-
- // --------------------------------------------------------------------------------------------
-
- public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) {
- if (this.cachedPlans != null) {
- throw new IllegalStateException();
- } else {
- WorksetPlanNode wspn = new WorksetPlanNode(this, "Workset ("+this.getOperator().getName()+")", gProps, lProps, initialInput);
- this.cachedPlans = Collections.<PlanNode>singletonList(wspn);
- }
- }
-
- public WorksetPlanNode getCurrentWorksetPlanNode() {
- if (this.cachedPlans != null) {
- return (WorksetPlanNode) this.cachedPlans.get(0);
- } else {
- throw new IllegalStateException();
- }
- }
-
- public WorksetIterationNode getIterationNode() {
- return this.iterationNode;
- }
-
- @Override
- public void computeOutputEstimates(DataStatistics statistics) {
- copyEstimates(this.iterationNode.getInitialWorksetPredecessorNode());
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Gets the contract object for this data source node.
- *
- * @return The contract.
- */
- @Override
- public WorksetPlaceHolder<?> getOperator() {
- return (WorksetPlaceHolder<?>) super.getOperator();
- }
-
- @Override
- public String getName() {
- return "Workset";
- }
-
- @Override
- public void computeUnclosedBranchStack() {
- if (this.openBranches != null) {
- return;
- }
-
- DagConnection worksetInput = this.iterationNode.getSecondIncomingConnection();
- OptimizerNode worksetSource = worksetInput.getSource();
-
- addClosedBranches(worksetSource.closedBranchingNodes);
- List<UnclosedBranchDescriptor> fromInput = worksetSource.getBranchesForParent(worksetInput);
- this.openBranches = (fromInput == null || fromInput.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
deleted file mode 100644
index 57ba29d..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
+++ /dev/null
@@ -1,500 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.util.Utils;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class represents global properties of the data at a certain point in the plan.
- * Global properties are properties that describe data across different partitions, such as
- * whether the data is hash partitioned, range partitioned, replicated, etc.
- */
-public class GlobalProperties implements Cloneable {
-
- public static final Logger LOG = LoggerFactory.getLogger(GlobalProperties.class);
-
- private PartitioningProperty partitioning; // the type partitioning
-
- private FieldList partitioningFields; // the fields which are partitioned
-
- private Ordering ordering; // order of the partitioned fields, if it is an ordered (range) range partitioning
-
- private Set<FieldSet> uniqueFieldCombinations;
-
- private Partitioner<?> customPartitioner;
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Initializes the global properties with no partitioning.
- */
- public GlobalProperties() {
- this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Sets this global properties to represent a hash partitioning.
- *
- * @param partitionedFields The key fields on which the data is hash partitioned.
- */
- public void setHashPartitioned(FieldList partitionedFields) {
- if (partitionedFields == null) {
- throw new NullPointerException();
- }
-
- this.partitioning = PartitioningProperty.HASH_PARTITIONED;
- this.partitioningFields = partitionedFields;
- this.ordering = null;
- }
-
-
- public void setRangePartitioned(Ordering ordering) {
- if (ordering == null) {
- throw new NullPointerException();
- }
-
- this.partitioning = PartitioningProperty.RANGE_PARTITIONED;
- this.ordering = ordering;
- this.partitioningFields = ordering.getInvolvedIndexes();
- }
-
- public void setAnyPartitioning(FieldList partitionedFields) {
- if (partitionedFields == null) {
- throw new NullPointerException();
- }
-
- this.partitioning = PartitioningProperty.ANY_PARTITIONING;
- this.partitioningFields = partitionedFields;
- this.ordering = null;
- }
-
- public void setRandomPartitioned() {
- this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
- this.partitioningFields = null;
- this.ordering = null;
- }
-
- public void setFullyReplicated() {
- this.partitioning = PartitioningProperty.FULL_REPLICATION;
- this.partitioningFields = null;
- this.ordering = null;
- }
-
- public void setForcedRebalanced() {
- this.partitioning = PartitioningProperty.FORCED_REBALANCED;
- this.partitioningFields = null;
- this.ordering = null;
- }
-
- public void setCustomPartitioned(FieldList partitionedFields, Partitioner<?> partitioner) {
- if (partitionedFields == null || partitioner == null) {
- throw new NullPointerException();
- }
-
- this.partitioning = PartitioningProperty.CUSTOM_PARTITIONING;
- this.partitioningFields = partitionedFields;
- this.ordering = null;
- this.customPartitioner = partitioner;
- }
-
- public void addUniqueFieldCombination(FieldSet fields) {
- if (fields == null) {
- return;
- }
- if (this.uniqueFieldCombinations == null) {
- this.uniqueFieldCombinations = new HashSet<FieldSet>();
- }
- this.uniqueFieldCombinations.add(fields);
- }
-
- public void clearUniqueFieldCombinations() {
- if (this.uniqueFieldCombinations != null) {
- this.uniqueFieldCombinations = null;
- }
- }
-
- public Set<FieldSet> getUniqueFieldCombination() {
- return this.uniqueFieldCombinations;
- }
-
- public FieldList getPartitioningFields() {
- return this.partitioningFields;
- }
-
- public Ordering getPartitioningOrdering() {
- return this.ordering;
- }
-
- public PartitioningProperty getPartitioning() {
- return this.partitioning;
- }
-
- public Partitioner<?> getCustomPartitioner() {
- return this.customPartitioner;
- }
-
- // --------------------------------------------------------------------------------------------
-
- public boolean isPartitionedOnFields(FieldSet fields) {
- if (this.partitioning.isPartitionedOnKey() && fields.isValidSubset(this.partitioningFields)) {
- return true;
- } else if (this.uniqueFieldCombinations != null) {
- for (FieldSet set : this.uniqueFieldCombinations) {
- if (fields.isValidSubset(set)) {
- return true;
- }
- }
- return false;
- } else {
- return false;
- }
- }
-
- public boolean isExactlyPartitionedOnFields(FieldList fields) {
- return this.partitioning.isPartitionedOnKey() && fields.isExactMatch(this.partitioningFields);
- }
-
- public boolean matchesOrderedPartitioning(Ordering o) {
- if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) {
- if (this.ordering.getNumberOfFields() > o.getNumberOfFields()) {
- return false;
- }
-
- for (int i = 0; i < this.ordering.getNumberOfFields(); i++) {
- if (this.ordering.getFieldNumber(i) != o.getFieldNumber(i)) {
- return false;
- }
-
- // if this one request no order, everything is good
- final Order oo = o.getOrder(i);
- final Order to = this.ordering.getOrder(i);
- if (oo != Order.NONE) {
- if (oo == Order.ANY) {
- // if any order is requested, any not NONE order is good
- if (to == Order.NONE) {
- return false;
- }
- } else if (oo != to) {
- // the orders must be equal
- return false;
- }
- }
- }
- return true;
- } else {
- return false;
- }
- }
-
- public boolean isFullyReplicated() {
- return this.partitioning == PartitioningProperty.FULL_REPLICATION;
- }
-
- /**
- * Checks, if the properties in this object are trivial, i.e. only standard values.
- */
- public boolean isTrivial() {
- return partitioning == PartitioningProperty.RANDOM_PARTITIONED;
- }
-
- /**
- * This method resets the properties to a state where no properties are given.
- */
- public void reset() {
- this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
- this.ordering = null;
- this.partitioningFields = null;
- }
-
- /**
- * Filters these GlobalProperties by the fields that are forwarded to the output
- * as described by the SemanticProperties.
- *
- * @param props The semantic properties holding information about forwarded fields.
- * @param input The index of the input.
- * @return The filtered GlobalProperties
- */
- public GlobalProperties filterBySemanticProperties(SemanticProperties props, int input) {
-
- if (props == null) {
- throw new NullPointerException("SemanticProperties may not be null.");
- }
-
- GlobalProperties gp = new GlobalProperties();
-
- // filter partitioning
- switch(this.partitioning) {
- case RANGE_PARTITIONED:
- // check if ordering is preserved
- Ordering newOrdering = new Ordering();
- for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) {
- int sourceField = this.ordering.getInvolvedIndexes().get(i);
- FieldSet targetField = props.getForwardingTargetFields(input, sourceField);
-
- if (targetField == null || targetField.size() == 0) {
- // partitioning is destroyed
- newOrdering = null;
- break;
- } else {
- // use any field of target fields for now. We should use something like field equivalence sets in the future.
- if(targetField.size() > 1) {
- LOG.warn("Found that a field is forwarded to more than one target field in " +
- "semantic forwarded field information. Will only use the field with the lowest index.");
- }
- newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), this.ordering.getOrder(i));
- }
- }
- if(newOrdering != null) {
- gp.partitioning = PartitioningProperty.RANGE_PARTITIONED;
- gp.ordering = newOrdering;
- gp.partitioningFields = newOrdering.getInvolvedIndexes();
- }
- break;
- case HASH_PARTITIONED:
- case ANY_PARTITIONING:
- case CUSTOM_PARTITIONING:
- FieldList newPartitioningFields = new FieldList();
- for (int sourceField : this.partitioningFields) {
- FieldSet targetField = props.getForwardingTargetFields(input, sourceField);
-
- if (targetField == null || targetField.size() == 0) {
- newPartitioningFields = null;
- break;
- } else {
- // use any field of target fields for now. We should use something like field equivalence sets in the future.
- if(targetField.size() > 1) {
- LOG.warn("Found that a field is forwarded to more than one target field in " +
- "semantic forwarded field information. Will only use the field with the lowest index.");
- }
- newPartitioningFields = newPartitioningFields.addField(targetField.toArray()[0]);
- }
- }
- if(newPartitioningFields != null) {
- gp.partitioning = this.partitioning;
- gp.partitioningFields = newPartitioningFields;
- gp.customPartitioner = this.customPartitioner;
- }
- break;
- case FORCED_REBALANCED:
- case FULL_REPLICATION:
- case RANDOM_PARTITIONED:
- gp.partitioning = this.partitioning;
- break;
- default:
- throw new RuntimeException("Unknown partitioning type.");
- }
-
- // filter unique field combinations
- if (this.uniqueFieldCombinations != null) {
- Set<FieldSet> newUniqueFieldCombinations = new HashSet<FieldSet>();
- for (FieldSet fieldCombo : this.uniqueFieldCombinations) {
- FieldSet newFieldCombo = new FieldSet();
- for (Integer sourceField : fieldCombo) {
- FieldSet targetField = props.getForwardingTargetFields(input, sourceField);
-
- if (targetField == null || targetField.size() == 0) {
- newFieldCombo = null;
- break;
- } else {
- // use any field of target fields for now. We should use something like field equivalence sets in the future.
- if(targetField.size() > 1) {
- LOG.warn("Found that a field is forwarded to more than one target field in " +
- "semantic forwarded field information. Will only use the field with the lowest index.");
- }
- newFieldCombo = newFieldCombo.addField(targetField.toArray()[0]);
- }
- }
- if (newFieldCombo != null) {
- newUniqueFieldCombinations.add(newFieldCombo);
- }
- }
- if(!newUniqueFieldCombinations.isEmpty()) {
- gp.uniqueFieldCombinations = newUniqueFieldCombinations;
- }
- }
-
- return gp;
- }
-
-
- public void parameterizeChannel(Channel channel, boolean globalDopChange,
- ExecutionMode exchangeMode, boolean breakPipeline) {
-
- ShipStrategyType shipType;
- FieldList partitionKeys;
- boolean[] sortDirection;
- Partitioner<?> partitioner;
-
- switch (this.partitioning) {
- case RANDOM_PARTITIONED:
- shipType = globalDopChange ? ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD;
- partitionKeys = null;
- sortDirection = null;
- partitioner = null;
- break;
-
- case FULL_REPLICATION:
- shipType = ShipStrategyType.BROADCAST;
- partitionKeys = null;
- sortDirection = null;
- partitioner = null;
- break;
-
- case ANY_PARTITIONING:
- case HASH_PARTITIONED:
- shipType = ShipStrategyType.PARTITION_HASH;
- partitionKeys = Utils.createOrderedFromSet(this.partitioningFields);
- sortDirection = null;
- partitioner = null;
- break;
-
- case RANGE_PARTITIONED:
- shipType = ShipStrategyType.PARTITION_RANGE;
- partitionKeys = this.ordering.getInvolvedIndexes();
- sortDirection = this.ordering.getFieldSortDirections();
- partitioner = null;
- break;
-
- case FORCED_REBALANCED:
- shipType = ShipStrategyType.PARTITION_RANDOM;
- partitionKeys = null;
- sortDirection = null;
- partitioner = null;
- break;
-
- case CUSTOM_PARTITIONING:
- shipType = ShipStrategyType.PARTITION_CUSTOM;
- partitionKeys = this.partitioningFields;
- sortDirection = null;
- partitioner = this.customPartitioner;
- break;
-
- default:
- throw new CompilerException("Unsupported partitioning strategy");
- }
-
- DataExchangeMode exMode = DataExchangeMode.select(exchangeMode, shipType, breakPipeline);
- channel.setShipStrategy(shipType, partitionKeys, sortDirection, partitioner, exMode);
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((partitioning == null) ? 0 : partitioning.ordinal());
- result = prime * result + ((partitioningFields == null) ? 0 : partitioningFields.hashCode());
- result = prime * result + ((ordering == null) ? 0 : ordering.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj != null && obj instanceof GlobalProperties) {
- final GlobalProperties other = (GlobalProperties) obj;
- return (this.partitioning == other.partitioning)
- && (this.ordering == other.ordering || (this.ordering != null && this.ordering.equals(other.ordering)))
- && (this.partitioningFields == other.partitioningFields ||
- (this.partitioningFields != null && this.partitioningFields.equals(other.partitioningFields)))
- && (this.uniqueFieldCombinations == other.uniqueFieldCombinations ||
- (this.uniqueFieldCombinations != null && this.uniqueFieldCombinations.equals(other.uniqueFieldCombinations)));
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- final StringBuilder bld = new StringBuilder(
- "GlobalProperties [partitioning=" + partitioning +
- (this.partitioningFields == null ? "" : ", on fields " + this.partitioningFields) +
- (this.ordering == null ? "" : ", with ordering " + this.ordering));
-
- if (this.uniqueFieldCombinations == null) {
- bld.append(']');
- } else {
- bld.append(" - Unique field groups: ");
- bld.append(this.uniqueFieldCombinations);
- bld.append(']');
- }
- return bld.toString();
- }
-
- @Override
- public GlobalProperties clone() {
- final GlobalProperties newProps = new GlobalProperties();
- newProps.partitioning = this.partitioning;
- newProps.partitioningFields = this.partitioningFields;
- newProps.ordering = this.ordering;
- newProps.customPartitioner = this.customPartitioner;
- newProps.uniqueFieldCombinations = this.uniqueFieldCombinations == null ? null : new HashSet<FieldSet>(this.uniqueFieldCombinations);
- return newProps;
- }
-
- // --------------------------------------------------------------------------------------------
-
- public static GlobalProperties combine(GlobalProperties gp1, GlobalProperties gp2) {
- if (gp1.isFullyReplicated()) {
- if (gp2.isFullyReplicated()) {
- return new GlobalProperties();
- } else {
- return gp2;
- }
- } else if (gp2.isFullyReplicated()) {
- return gp1;
- } else if (gp1.ordering != null) {
- return gp1;
- } else if (gp2.ordering != null) {
- return gp2;
- } else if (gp1.partitioningFields != null) {
- return gp1;
- } else if (gp2.partitioningFields != null) {
- return gp2;
- } else if (gp1.uniqueFieldCombinations != null) {
- return gp1;
- } else if (gp2.uniqueFieldCombinations != null) {
- return gp2;
- } else if (gp1.getPartitioning().isPartitioned()) {
- return gp1;
- } else if (gp2.getPartitioning().isPartitioned()) {
- return gp2;
- } else {
- return gp1;
- }
- }
-}