You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:17 UTC
[38/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
deleted file mode 100644
index dbe04f4..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.operators.GenericDataSinkBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.util.Visitor;
-
-/**
- * The Optimizer representation of a data sink.
- */
-public class DataSinkNode extends OptimizerNode {
-
- protected DagConnection input; // The input edge
-
- /**
- * Creates a new DataSinkNode for the given sink operator.
- *
- * @param sink The data sink contract object.
- */
- public DataSinkNode(GenericDataSinkBase<?> sink) {
- super(sink);
- }
-
- // --------------------------------------------------------------------------------------
-
- /**
- * Gets the input of the sink.
- *
- * @return The input connection.
- */
- public DagConnection getInputConnection() {
- return this.input;
- }
-
- /**
- * Gets the predecessor of this node.
- *
- * @return The predecessor, or null, if no predecessor has been set.
- */
- public OptimizerNode getPredecessorNode() {
- if(this.input != null) {
- return input.getSource();
- } else {
- return null;
- }
- }
-
- /**
- * Gets the operator for which this optimizer sink node was created.
- *
- * @return The node's underlying operator.
- */
- @Override
- public GenericDataSinkBase<?> getOperator() {
- return (GenericDataSinkBase<?>) super.getOperator();
- }
-
- @Override
- public String getName() {
- return "Data Sink";
- }
-
- @Override
- public List<DagConnection> getIncomingConnections() {
- return Collections.singletonList(this.input);
- }
-
- /**
- * Gets all outgoing connections, which is an empty set for the data sink.
- *
- * @return An empty list.
- */
- @Override
- public List<DagConnection> getOutgoingConnections() {
- return Collections.emptyList();
- }
-
- @Override
- public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode) {
- Operator<?> children = getOperator().getInput();
-
- final OptimizerNode pred;
- final DagConnection conn;
-
- pred = contractToNode.get(children);
- conn = new DagConnection(pred, this, defaultExchangeMode);
-
- // create the connection and add it
- this.input = conn;
- pred.addOutgoingConnection(conn);
- }
-
- /**
- * Computes the estimated outputs for the data sink. Since the sink does not modify anything, it simply
- * copies the output estimates from its direct predecessor.
- */
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
- this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
- }
-
- @Override
- public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
- final InterestingProperties iProps = new InterestingProperties();
-
- {
- final Ordering partitioning = getOperator().getPartitionOrdering();
- final DataDistribution dataDist = getOperator().getDataDistribution();
- final RequestedGlobalProperties partitioningProps = new RequestedGlobalProperties();
- if (partitioning != null) {
- if(dataDist != null) {
- partitioningProps.setRangePartitioned(partitioning, dataDist);
- } else {
- partitioningProps.setRangePartitioned(partitioning);
- }
- iProps.addGlobalProperties(partitioningProps);
- }
- iProps.addGlobalProperties(partitioningProps);
- }
-
- {
- final Ordering localOrder = getOperator().getLocalOrder();
- final RequestedLocalProperties orderProps = new RequestedLocalProperties();
- if (localOrder != null) {
- orderProps.setOrdering(localOrder);
- }
- iProps.addLocalProperties(orderProps);
- }
-
- this.input.setInterestingProperties(iProps);
- }
-
- // --------------------------------------------------------------------------------------------
- // Branch Handling
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void computeUnclosedBranchStack() {
- if (this.openBranches != null) {
- return;
- }
-
- // we need to track open branches even in the sinks, because they get "closed" when
- // we build a single "root" for the data flow plan
- addClosedBranches(getPredecessorNode().closedBranchingNodes);
- this.openBranches = getPredecessorNode().getBranchesForParent(this.input);
- }
-
- @Override
- protected List<UnclosedBranchDescriptor> getBranchesForParent(DagConnection parent) {
- // return our own stack of open branches, because nothing is added
- return this.openBranches;
- }
-
- // --------------------------------------------------------------------------------------------
- // Recursive Optimization
- // --------------------------------------------------------------------------------------------
-
- @Override
- public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
- // check if we have a cached version
- if (this.cachedPlans != null) {
- return this.cachedPlans;
- }
-
- // calculate alternative sub-plans for predecessor
- List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
- List<PlanNode> outputPlans = new ArrayList<PlanNode>();
-
- final int dop = getParallelism();
- final int inDop = getPredecessorNode().getParallelism();
-
- final ExecutionMode executionMode = this.input.getDataExchangeMode();
- final boolean dopChange = dop != inDop;
- final boolean breakPipeline = this.input.isBreakingPipeline();
-
- InterestingProperties ips = this.input.getInterestingProperties();
- for (PlanNode p : subPlans) {
- for (RequestedGlobalProperties gp : ips.getGlobalProperties()) {
- for (RequestedLocalProperties lp : ips.getLocalProperties()) {
- Channel c = new Channel(p);
- gp.parameterizeChannel(c, dopChange, executionMode, breakPipeline);
- lp.parameterizeChannel(c);
- c.setRequiredLocalProps(lp);
- c.setRequiredGlobalProps(gp);
-
- // no need to check whether the created properties meet what we need in case
- // of ordering or global ordering, because the only interesting properties we have
- // are what we require
- outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getOperator().getName()+")" ,c));
- }
- }
- }
-
- // cost and prune the plans
- for (PlanNode node : outputPlans) {
- estimator.costOperator(node);
- }
- prunePlanAlternatives(outputPlans);
-
- this.cachedPlans = outputPlans;
- return outputPlans;
- }
-
- // --------------------------------------------------------------------------------------------
- // Function Annotation Handling
- // --------------------------------------------------------------------------------------------
-
- @Override
- public SemanticProperties getSemanticProperties() {
- return new EmptySemanticProperties();
- }
-
- // --------------------------------------------------------------------------------------------
- // Miscellaneous
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void accept(Visitor<OptimizerNode> visitor) {
- if (visitor.preVisit(this)) {
- if (getPredecessorNode() != null) {
- getPredecessorNode().accept(visitor);
- } else {
- throw new CompilerException();
- }
- visitor.postVisit(this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
deleted file mode 100644
index e4b35b7..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.apache.flink.api.common.io.ReplicatingInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.GenericDataSourceBase.SplitDataProperties;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Visitor;
-
-/**
- * The optimizer's internal representation of a data source.
- */
-public class DataSourceNode extends OptimizerNode {
-
- private final boolean sequentialInput;
-
- private final boolean replicatedInput;
-
- private GlobalProperties gprops;
-
- private LocalProperties lprops;
-
- /**
- * Creates a new DataSourceNode for the given contract.
- *
- * @param pactContract
- * The data source contract object.
- */
- public DataSourceNode(GenericDataSourceBase<?, ?> pactContract) {
- super(pactContract);
-
- if (pactContract.getUserCodeWrapper().getUserCodeClass() == null) {
- throw new IllegalArgumentException("Input format has not been set.");
- }
-
- if (NonParallelInput.class.isAssignableFrom(pactContract.getUserCodeWrapper().getUserCodeClass())) {
- setDegreeOfParallelism(1);
- this.sequentialInput = true;
- } else {
- this.sequentialInput = false;
- }
-
- this.replicatedInput = ReplicatingInputFormat.class.isAssignableFrom(
- pactContract.getUserCodeWrapper().getUserCodeClass());
-
- this.gprops = new GlobalProperties();
- this.lprops = new LocalProperties();
-
- SplitDataProperties<?> splitProps = pactContract.getSplitDataProperties();
-
- if(replicatedInput) {
- this.gprops.setFullyReplicated();
- this.lprops = new LocalProperties();
- } else if (splitProps != null) {
- // configure data properties of data source using split properties
- setDataPropertiesFromSplitProperties(splitProps);
- }
-
- }
-
- /**
- * Gets the contract object for this data source node.
- *
- * @return The contract.
- */
- @Override
- public GenericDataSourceBase<?, ?> getOperator() {
- return (GenericDataSourceBase<?, ?>) super.getOperator();
- }
-
- @Override
- public String getName() {
- return "Data Source";
- }
-
- @Override
- public void setDegreeOfParallelism(int degreeOfParallelism) {
- // if unsplittable, parallelism remains at 1
- if (!this.sequentialInput) {
- super.setDegreeOfParallelism(degreeOfParallelism);
- }
- }
-
- @Override
- public List<DagConnection> getIncomingConnections() {
- return Collections.<DagConnection>emptyList();
- }
-
- @Override
- public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultDataExchangeMode) {}
-
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- // see, if we have a statistics object that can tell us a bit about the file
- if (statistics != null) {
- // instantiate the input format, as this is needed by the statistics
- InputFormat<?, ?> format;
- String inFormatDescription = "<unknown>";
-
- try {
- format = getOperator().getFormatWrapper().getUserCodeObject();
- Configuration config = getOperator().getParameters();
- format.configure(config);
- }
- catch (Throwable t) {
- if (Optimizer.LOG.isWarnEnabled()) {
- Optimizer.LOG.warn("Could not instantiate InputFormat to obtain statistics."
- + " Limited statistics will be available.", t);
- }
- return;
- }
- try {
- inFormatDescription = format.toString();
- }
- catch (Throwable t) {
- // we can ignore this error, as it only prevents us to use a cosmetic string
- }
-
- // first of all, get the statistics from the cache
- final String statisticsKey = getOperator().getStatisticsKey();
- final BaseStatistics cachedStatistics = statistics.getBaseStatistics(statisticsKey);
-
- BaseStatistics bs = null;
- try {
- bs = format.getStatistics(cachedStatistics);
- }
- catch (Throwable t) {
- if (Optimizer.LOG.isWarnEnabled()) {
- Optimizer.LOG.warn("Error obtaining statistics from input format: " + t.getMessage(), t);
- }
- }
-
- if (bs != null) {
- final long len = bs.getTotalInputSize();
- if (len == BaseStatistics.SIZE_UNKNOWN) {
- if (Optimizer.LOG.isInfoEnabled()) {
- Optimizer.LOG.info("Compiler could not determine the size of input '" + inFormatDescription + "'. Using default estimates.");
- }
- }
- else if (len >= 0) {
- this.estimatedOutputSize = len;
- }
-
- final long card = bs.getNumberOfRecords();
- if (card != BaseStatistics.NUM_RECORDS_UNKNOWN) {
- this.estimatedNumRecords = card;
- }
- }
- }
- }
-
- @Override
- public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
- // no children, so nothing to compute
- }
-
- @Override
- public void computeUnclosedBranchStack() {
- // because there are no inputs, there are no unclosed branches.
- this.openBranches = Collections.emptyList();
- }
-
- @Override
- public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
- if (this.cachedPlans != null) {
- return this.cachedPlans;
- }
-
- SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getOperator().getName()+")",
- this.gprops, this.lprops);
-
- if(!replicatedInput) {
- candidate.updatePropertiesWithUniqueSets(getUniqueFields());
-
- final Costs costs = new Costs();
- if (FileInputFormat.class.isAssignableFrom(getOperator().getFormatWrapper().getUserCodeClass()) &&
- this.estimatedOutputSize >= 0) {
- estimator.addFileInputCost(this.estimatedOutputSize, costs);
- }
- candidate.setCosts(costs);
- } else {
- // replicated input
- final Costs costs = new Costs();
- InputFormat<?,?> inputFormat =
- ((ReplicatingInputFormat<?,?>) getOperator().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat();
- if (FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) &&
- this.estimatedOutputSize >= 0) {
- estimator.addFileInputCost(this.estimatedOutputSize * this.getParallelism(), costs);
- }
- candidate.setCosts(costs);
- }
-
- // since there is only a single plan for the data-source, return a list with that element only
- List<PlanNode> plans = new ArrayList<PlanNode>(1);
- plans.add(candidate);
-
- this.cachedPlans = plans;
- return plans;
- }
-
- @Override
- public SemanticProperties getSemanticProperties() {
- return new EmptySemanticProperties();
- }
-
- @Override
- public void accept(Visitor<OptimizerNode> visitor) {
- if (visitor.preVisit(this)) {
- visitor.postVisit(this);
- }
- }
-
- private void setDataPropertiesFromSplitProperties(SplitDataProperties splitProps) {
-
- // set global properties
- int[] partitionKeys = splitProps.getSplitPartitionKeys();
- Partitioner<?> partitioner = splitProps.getSplitPartitioner();
-
- if(partitionKeys != null && partitioner != null) {
- this.gprops.setCustomPartitioned(new FieldList(partitionKeys), partitioner);
- }
- else if(partitionKeys != null) {
- this.gprops.setAnyPartitioning(new FieldList(partitionKeys));
- }
- // set local properties
- int[] groupingKeys = splitProps.getSplitGroupKeys();
- Ordering ordering = splitProps.getSplitOrder();
-
- // more than one split per source tasks possible.
- // adapt split grouping and sorting
- if(ordering != null) {
-
- // sorting falls back to grouping because a source can read multiple,
- // randomly assigned splits
- groupingKeys = ordering.getFieldPositions();
- }
-
- if(groupingKeys != null && partitionKeys != null) {
- // check if grouping is also valid across splits, i.e., whether grouping keys are
- // valid superset of partition keys
- boolean allFieldsIncluded = true;
- for(int i : partitionKeys) {
- boolean fieldIncluded = false;
- for(int j : groupingKeys) {
- if(i == j) {
- fieldIncluded = true;
- break;
- }
- }
- if(!fieldIncluded) {
- allFieldsIncluded = false;
- break;
- }
- }
- if (allFieldsIncluded) {
- this.lprops = LocalProperties.forGrouping(new FieldList(groupingKeys));
- } else {
- this.lprops = new LocalProperties();
- }
-
- } else {
- this.lprops = new LocalProperties();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
deleted file mode 100644
index 482951b..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-/**
- * Methods for operators / connections that provide estimated about data size and
- * characteristics.
- */
-public interface EstimateProvider {
-
- /**
- * Gets the estimated output size from this node.
- *
- * @return The estimated output size.
- */
- long getEstimatedOutputSize();
-
- /**
- * Gets the estimated number of records in the output of this node.
- *
- * @return The estimated number of records.
- */
- long getEstimatedNumRecords();
-
- /**
- * Gets the estimated number of bytes per record.
- *
- * @return The estimated number of bytes per record.
- */
- float getEstimatedAvgWidthPerOutputRecord();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
deleted file mode 100644
index 118ddc8..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.base.FilterOperatorBase;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.FilterDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>FlatMap</i> operator node.
- */
-public class FilterNode extends SingleInputNode {
-
- private final List<OperatorDescriptorSingle> possibleProperties;
-
- public FilterNode(FilterOperatorBase<?, ?> operator) {
- super(operator);
- this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new FilterDescriptor());
- }
-
- @Override
- public FilterOperatorBase<?, ?> getOperator() {
- return (FilterOperatorBase<?, ?>) super.getOperator();
- }
-
- @Override
- public String getName() {
- return "Filter";
- }
-
- @Override
- public SemanticProperties getSemanticProperties() {
- return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
- }
-
- @Override
- protected List<OperatorDescriptorSingle> getPossibleProperties() {
- return this.possibleProperties;
- }
-
- /**
- * Computes the estimates for the Filter operator. Since it applies a filter on the data we assume a cardinality
- * decrease. To give the system a hint at data decrease, we use a default magic number to indicate a 0.5 decrease.
- */
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- this.estimatedNumRecords = (long) (getPredecessorNode().getEstimatedNumRecords() * 0.5);
- this.estimatedOutputSize = (long) (getPredecessorNode().getEstimatedOutputSize() * 0.5);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
deleted file mode 100644
index f713d56..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.FlatMapDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>FlatMap</i> operator node.
- */
-public class FlatMapNode extends SingleInputNode {
-
- private final List<OperatorDescriptorSingle> possibleProperties;
-
- public FlatMapNode(FlatMapOperatorBase<?, ?, ?> operator) {
- super(operator);
-
- this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new FlatMapDescriptor());
- }
-
- @Override
- public FlatMapOperatorBase<?, ?, ?> getOperator() {
- return (FlatMapOperatorBase<?, ?, ?>) super.getOperator();
- }
-
- @Override
- public String getName() {
- return "FlatMap";
- }
-
- @Override
- protected List<OperatorDescriptorSingle> getPossibleProperties() {
- return this.possibleProperties;
- }
-
- /**
- * Computes the estimates for the FlatMap operator. Since it un-nests, we assume a cardinality
- * increase. To give the system a hint at data increase, we take a default magic number of a 5 times increase.
- */
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords() * 5;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
deleted file mode 100644
index 564c0d3..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.AllGroupCombineProperties;
-import org.apache.flink.optimizer.operators.GroupCombineProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The optimizer representation of a <i>GroupCombineNode</i> operation.
- */
-public class GroupCombineNode extends SingleInputNode {
-
- private final List<OperatorDescriptorSingle> possibleProperties;
-
- /**
- * Creates a new optimizer node for the given operator.
- *
- * @param operator The reduce operation.
- */
- public GroupCombineNode(GroupCombineOperatorBase<?, ?, ?> operator) {
- super(operator);
-
- if (this.keys == null) {
- // case of a key-less reducer. force a parallelism of 1
- setDegreeOfParallelism(1);
- }
-
- this.possibleProperties = initPossibleProperties();
- }
-
- private List<OperatorDescriptorSingle> initPossibleProperties() {
-
- // check if we can work with a grouping (simple reducer), or if we need ordering because of a group order
- Ordering groupOrder = getOperator().getGroupOrder();
- if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
- groupOrder = null;
- }
-
- OperatorDescriptorSingle props = (this.keys == null ?
- new AllGroupCombineProperties() :
- new GroupCombineProperties(this.keys, groupOrder));
-
- return Collections.singletonList(props);
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Gets the operator represented by this optimizer node.
- *
- * @return The operator represented by this optimizer node.
- */
- @Override
- public GroupCombineOperatorBase<?, ?, ?> getOperator() {
- return (GroupCombineOperatorBase<?, ?, ?>) super.getOperator();
- }
-
- @Override
- public String getName() {
- return "GroupCombine";
- }
-
- @Override
- protected List<OperatorDescriptorSingle> getPossibleProperties() {
- return this.possibleProperties;
- }
-
- // --------------------------------------------------------------------------------------------
- // Estimates
- // --------------------------------------------------------------------------------------------
-
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- // no real estimates possible for a reducer.
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
deleted file mode 100644
index 77acae5..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.operators.AllGroupReduceProperties;
-import org.apache.flink.optimizer.operators.AllGroupWithPartialPreGroupProperties;
-import org.apache.flink.optimizer.operators.GroupReduceProperties;
-import org.apache.flink.optimizer.operators.GroupReduceWithCombineProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * The optimizer representation of a <i>GroupReduce</i> operation.
- */
-public class GroupReduceNode extends SingleInputNode {
-
- private final List<OperatorDescriptorSingle> possibleProperties;
-
- private GroupReduceNode combinerUtilityNode;
-
- /**
- * Creates a new optimizer node for the given operator.
- *
- * @param operator The reduce operation.
- */
- public GroupReduceNode(GroupReduceOperatorBase<?, ?, ?> operator) {
- super(operator);
-
- if (this.keys == null) {
- // case of a key-less reducer. force a parallelism of 1
- setDegreeOfParallelism(1);
- }
-
- this.possibleProperties = initPossibleProperties(operator.getCustomPartitioner());
- }
-
- public GroupReduceNode(GroupReduceNode reducerToCopyForCombiner) {
- super(reducerToCopyForCombiner);
-
- this.possibleProperties = Collections.emptyList();
- }
-
- private List<OperatorDescriptorSingle> initPossibleProperties(Partitioner<?> customPartitioner) {
- // see if an internal hint dictates the strategy to use
- final Configuration conf = getOperator().getParameters();
- final String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
-
- final boolean useCombiner;
- if (localStrategy != null) {
- if (Optimizer.HINT_LOCAL_STRATEGY_SORT.equals(localStrategy)) {
- useCombiner = false;
- }
- else if (Optimizer.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) {
- if (!isCombineable()) {
- Optimizer.LOG.warn("Strategy hint for GroupReduce '" + getOperator().getName() +
- "' requires combinable reduce, but user function is not marked combinable.");
- }
- useCombiner = true;
- } else {
- throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy);
- }
- } else {
- useCombiner = isCombineable();
- }
-
- // check if we can work with a grouping (simple reducer), or if we need ordering because of a group order
- Ordering groupOrder = null;
- if (getOperator() instanceof GroupReduceOperatorBase) {
- groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) getOperator()).getGroupOrder();
- if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
- groupOrder = null;
- }
- }
-
- OperatorDescriptorSingle props = useCombiner ?
- (this.keys == null ? new AllGroupWithPartialPreGroupProperties() : new GroupReduceWithCombineProperties(this.keys, groupOrder, customPartitioner)) :
- (this.keys == null ? new AllGroupReduceProperties() : new GroupReduceProperties(this.keys, groupOrder, customPartitioner));
-
- return Collections.singletonList(props);
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Gets the operator represented by this optimizer node.
- *
- * @return The operator represented by this optimizer node.
- */
- @Override
- public GroupReduceOperatorBase<?, ?, ?> getOperator() {
- return (GroupReduceOperatorBase<?, ?, ?>) super.getOperator();
- }
-
- /**
- * Checks, whether a combiner function has been given for the function encapsulated
- * by this reduce contract.
- *
- * @return True, if a combiner has been given, false otherwise.
- */
- public boolean isCombineable() {
- return getOperator().isCombinable();
- }
-
- @Override
- public String getName() {
- return "GroupReduce";
- }
-
- @Override
- protected List<OperatorDescriptorSingle> getPossibleProperties() {
- return this.possibleProperties;
- }
-
- // --------------------------------------------------------------------------------------------
- // Estimates
- // --------------------------------------------------------------------------------------------
-
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- // no real estimates possible for a reducer.
- }
-
- public GroupReduceNode getCombinerUtilityNode() {
- if (this.combinerUtilityNode == null) {
- this.combinerUtilityNode = new GroupReduceNode(this);
-
- // we conservatively assume the combiner returns the same data size as it consumes
- this.combinerUtilityNode.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
- this.combinerUtilityNode.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
- }
- return this.combinerUtilityNode;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java
deleted file mode 100644
index 1fdae51..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.util.Visitor;
-
-final class InterestingPropertiesClearer implements Visitor<OptimizerNode> {
-
- static final InterestingPropertiesClearer INSTANCE = new InterestingPropertiesClearer();
-
- @Override
- public boolean preVisit(OptimizerNode visitable) {
- if (visitable.getInterestingProperties() != null) {
- visitable.clearInterestingProperties();
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public void postVisit(OptimizerNode visitable) {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java
deleted file mode 100644
index 5d28043..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.util.Visitor;
-
-/**
- *
- */
-public interface IterationNode {
-
- void acceptForStepFunction(Visitor<OptimizerNode> visitor);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
deleted file mode 100644
index cbd58ca..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
-import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
-import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * The Optimizer representation of a join operator.
- */
-public class JoinNode extends TwoInputNode {
-
- private List<OperatorDescriptorDual> dataProperties;
-
- /**
- * Creates a new JoinNode for the given join operator.
- *
- * @param joinOperatorBase The join operator object.
- */
- public JoinNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
- super(joinOperatorBase);
-
- this.dataProperties = getDataProperties(joinOperatorBase,
- joinOperatorBase.getJoinHint(), joinOperatorBase.getCustomPartitioner());
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Gets the contract object for this match node.
- *
- * @return The contract.
- */
- @Override
- public JoinOperatorBase<?, ?, ?, ?> getOperator() {
- return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
- }
-
- @Override
- public String getName() {
- return "Join";
- }
-
- @Override
- protected List<OperatorDescriptorDual> getPossibleProperties() {
- return this.dataProperties;
- }
-
- public void makeJoinWithSolutionSet(int solutionsetInputIndex) {
- OperatorDescriptorDual op;
- if (solutionsetInputIndex == 0) {
- op = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
- } else if (solutionsetInputIndex == 1) {
- op = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
- } else {
- throw new IllegalArgumentException();
- }
-
- this.dataProperties = Collections.singletonList(op);
- }
-
- /**
- * The default estimates build on the principle of inclusion: The smaller input key domain is included in the larger
- * input key domain. We also assume that every key from the larger input has one join partner in the smaller input.
- * The result cardinality is hence the larger one.
- */
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
- long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
- this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : Math.max(card1, card2);
-
- if (this.estimatedNumRecords >= 0) {
- float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
- float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
- float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
-
- if (width > 0) {
- this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
- }
- }
- }
-
- private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint,
- Partitioner<?> customPartitioner)
- {
- // see if an internal hint dictates the strategy to use
- Configuration conf = joinOperatorBase.getParameters();
- String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
-
- if (localStrategy != null) {
- final AbstractJoinDescriptor fixedDriverStrat;
- if (Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
- Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
- Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
- Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
- {
- fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2);
- }
- else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
- fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
- }
- else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
- fixedDriverStrat = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
- }
- else {
- throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy);
- }
-
- if (customPartitioner != null) {
- fixedDriverStrat.setCustomPartitioner(customPartitioner);
- }
-
- ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
- list.add(fixedDriverStrat);
- return list;
- }
- else {
- ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
-
- joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;
-
- switch (joinHint) {
- case BROADCAST_HASH_FIRST:
- list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
- break;
- case BROADCAST_HASH_SECOND:
- list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
- break;
- case REPARTITION_HASH_FIRST:
- list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
- break;
- case REPARTITION_HASH_SECOND:
- list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
- break;
- case REPARTITION_SORT_MERGE:
- list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true));
- break;
- case OPTIMIZER_CHOOSES:
- list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2));
- list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
- list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
- break;
- default:
- throw new CompilerException("Unrecognized join hint: " + joinHint);
- }
-
- if (customPartitioner != null) {
- for (OperatorDescriptorDual descr : list) {
- ((AbstractJoinDescriptor) descr).setCustomPartitioner(customPartitioner);
- }
- }
-
- return list;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
deleted file mode 100644
index 35def59..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.MapDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>Map</i> operator node.
- */
-public class MapNode extends SingleInputNode {
-
- private final List<OperatorDescriptorSingle> possibleProperties;
-
- /**
- * Creates a new MapNode for the given operator.
- *
- * @param operator The map operator.
- */
- public MapNode(SingleInputOperator<?, ?, ?> operator) {
- super(operator);
-
- this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapDescriptor());
- }
-
- @Override
- public String getName() {
- return "Map";
- }
-
- @Override
- protected List<OperatorDescriptorSingle> getPossibleProperties() {
- return this.possibleProperties;
- }
-
- /**
- * Computes the estimates for the Map operator.
- * We assume that by default, Map takes one value and transforms it into another value.
- * The cardinality consequently stays the same.
- */
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
deleted file mode 100644
index b287c33..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.MapPartitionDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>MapPartition</i> operator node.
- */
-public class MapPartitionNode extends SingleInputNode {
-
- private final List<OperatorDescriptorSingle> possibleProperties;
-
- /**
- * Creates a new MapNode for the given contract.
- *
- * @param operator The map partition contract object.
- */
- public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
- super(operator);
-
- this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
- }
-
- @Override
- public String getName() {
- return "MapPartition";
- }
-
- @Override
- protected List<OperatorDescriptorSingle> getPossibleProperties() {
- return this.possibleProperties;
- }
-
- /**
- * Computes the estimates for the MapPartition operator.
- * We assume that by default, Map takes one value and transforms it into another value.
- * The cardinality consequently stays the same.
- */
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- // we really cannot make any estimates here
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
deleted file mode 100644
index de3cd22..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
-import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * The Optimizer representation of a join operator.
- */
-public class MatchNode extends TwoInputNode {
-
- private List<OperatorDescriptorDual> dataProperties;
-
- /**
- * Creates a new MatchNode for the given join operator.
- *
- * @param joinOperatorBase The join operator object.
- */
- public MatchNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
- super(joinOperatorBase);
- this.dataProperties = getDataProperties(joinOperatorBase, joinOperatorBase.getJoinHint());
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Gets the contract object for this match node.
- *
- * @return The contract.
- */
- @Override
- public JoinOperatorBase<?, ?, ?, ?> getOperator() {
- return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
- }
-
- @Override
- public String getName() {
- return "Join";
- }
-
- @Override
- protected List<OperatorDescriptorDual> getPossibleProperties() {
- return this.dataProperties;
- }
-
- public void makeJoinWithSolutionSet(int solutionsetInputIndex) {
- OperatorDescriptorDual op;
- if (solutionsetInputIndex == 0) {
- op = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
- } else if (solutionsetInputIndex == 1) {
- op = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
- } else {
- throw new IllegalArgumentException();
- }
-
- this.dataProperties = Collections.singletonList(op);
- }
-
- /**
- * The default estimates build on the principle of inclusion: The smaller input key domain is included in the larger
- * input key domain. We also assume that every key from the larger input has one join partner in the smaller input.
- * The result cardinality is hence the larger one.
- */
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
- long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
- this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : Math.max(card1, card2);
-
- if (this.estimatedNumRecords >= 0) {
- float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
- float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
- float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
-
- if (width > 0) {
- this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
- }
- }
- }
-
- private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint) {
- // see if an internal hint dictates the strategy to use
- Configuration conf = joinOperatorBase.getParameters();
- String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
-
- if (localStrategy != null) {
- final OperatorDescriptorDual fixedDriverStrat;
- if (Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
- Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
- Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
- Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
- {
- fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2);
- } else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
- fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
- } else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
- fixedDriverStrat = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
- } else {
- throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy);
- }
- ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
- list.add(fixedDriverStrat);
- return list;
- }
- else {
- ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
-
- joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;
-
- switch (joinHint) {
- case BROADCAST_HASH_FIRST:
- list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
- break;
- case BROADCAST_HASH_SECOND:
- list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
- break;
- case REPARTITION_HASH_FIRST:
- list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
- break;
- case REPARTITION_HASH_SECOND:
- list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
- break;
- case REPARTITION_SORT_MERGE:
- list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true));
- break;
- case OPTIMIZER_CHOOSES:
- list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2));
- list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
- list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
- break;
- default:
- throw new CompilerException("Unrecognized join hint: " + joinHint);
- }
-
- return list;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java
deleted file mode 100644
index 76467cf..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.NoOpDescriptor;
-
-/**
- * The optimizer's internal representation of a <i>No Operation</i> node.
- */
-public class NoOpNode extends UnaryOperatorNode {
-
- public NoOpNode() {
- super("No Op", new FieldSet(), new NoOpDescriptor());
- }
-
- public NoOpNode(String name) {
- super(name, new FieldSet(), new NoOpDescriptor());
- }
-
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
- this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
- }
-}