You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:17 UTC

[38/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
deleted file mode 100644
index dbe04f4..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.operators.GenericDataSinkBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.util.Visitor;
-
-/**
- * The Optimizer representation of a data sink.
- */
-public class DataSinkNode extends OptimizerNode {
-	
-	protected DagConnection input;			// The input edge
-	
-	/**
-	 * Creates a new DataSinkNode for the given sink operator.
-	 * 
-	 * @param sink The data sink contract object.
-	 */
-	public DataSinkNode(GenericDataSinkBase<?> sink) {
-		super(sink);
-	}
-
-	// --------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the input of the sink.
-	 * 
-	 * @return The input connection.
-	 */
-	public DagConnection getInputConnection() {
-		return this.input;
-	}
-	
-	/**
-	 * Gets the predecessor of this node.
-	 *
-	 * @return The predecessor, or null, if no predecessor has been set.
-	 */
-	public OptimizerNode getPredecessorNode() {
-		if(this.input != null) {
-			return input.getSource();
-		} else {
-			return null;
-		}
-	}
-
-	/**
-	 * Gets the operator for which this optimizer sink node was created.
-	 * 
-	 * @return The node's underlying operator.
-	 */
-	@Override
-	public GenericDataSinkBase<?> getOperator() {
-		return (GenericDataSinkBase<?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "Data Sink";
-	}
-
-	@Override
-	public List<DagConnection> getIncomingConnections() {
-		return Collections.singletonList(this.input);
-	}
-
-	/**
-	 * Gets all outgoing connections, which is an empty set for the data sink.
-	 *
-	 * @return An empty list.
-	 */
-	@Override
-	public List<DagConnection> getOutgoingConnections() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode) {
-		Operator<?> children = getOperator().getInput();
-
-		final OptimizerNode pred;
-		final DagConnection conn;
-		
-		pred = contractToNode.get(children);
-		conn = new DagConnection(pred, this, defaultExchangeMode);
-			
-		// create the connection and add it
-		this.input = conn;
-		pred.addOutgoingConnection(conn);
-	}
-
-	/**
-	 * Computes the estimated outputs for the data sink. Since the sink does not modify anything, it simply
-	 * copies the output estimates from its direct predecessor.
-	 */
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
-		this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
-	}
-
-	@Override
-	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
-		final InterestingProperties iProps = new InterestingProperties();
-		
-		{
-			final Ordering partitioning = getOperator().getPartitionOrdering();
-			final DataDistribution dataDist = getOperator().getDataDistribution();
-			final RequestedGlobalProperties partitioningProps = new RequestedGlobalProperties();
-			if (partitioning != null) {
-				if(dataDist != null) {
-					partitioningProps.setRangePartitioned(partitioning, dataDist);
-				} else {
-					partitioningProps.setRangePartitioned(partitioning);
-				}
-				iProps.addGlobalProperties(partitioningProps);
-			}
-			iProps.addGlobalProperties(partitioningProps);
-		}
-		
-		{
-			final Ordering localOrder = getOperator().getLocalOrder();
-			final RequestedLocalProperties orderProps = new RequestedLocalProperties();
-			if (localOrder != null) {
-				orderProps.setOrdering(localOrder);
-			}
-			iProps.addLocalProperties(orderProps);
-		}
-		
-		this.input.setInterestingProperties(iProps);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                                     Branch Handling
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void computeUnclosedBranchStack() {
-		if (this.openBranches != null) {
-			return;
-		}
-
-		// we need to track open branches even in the sinks, because they get "closed" when
-		// we build a single "root" for the data flow plan
-		addClosedBranches(getPredecessorNode().closedBranchingNodes);
-		this.openBranches = getPredecessorNode().getBranchesForParent(this.input);
-	}
-	
-	@Override
-	protected List<UnclosedBranchDescriptor> getBranchesForParent(DagConnection parent) {
-		// return our own stack of open branches, because nothing is added
-		return this.openBranches;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//                                   Recursive Optimization
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
-		// check if we have a cached version
-		if (this.cachedPlans != null) {
-			return this.cachedPlans;
-		}
-		
-		// calculate alternative sub-plans for predecessor
-		List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
-		List<PlanNode> outputPlans = new ArrayList<PlanNode>();
-		
-		final int dop = getParallelism();
-		final int inDop = getPredecessorNode().getParallelism();
-
-		final ExecutionMode executionMode = this.input.getDataExchangeMode();
-		final boolean dopChange = dop != inDop;
-		final boolean breakPipeline = this.input.isBreakingPipeline();
-
-		InterestingProperties ips = this.input.getInterestingProperties();
-		for (PlanNode p : subPlans) {
-			for (RequestedGlobalProperties gp : ips.getGlobalProperties()) {
-				for (RequestedLocalProperties lp : ips.getLocalProperties()) {
-					Channel c = new Channel(p);
-					gp.parameterizeChannel(c, dopChange, executionMode, breakPipeline);
-					lp.parameterizeChannel(c);
-					c.setRequiredLocalProps(lp);
-					c.setRequiredGlobalProps(gp);
-					
-					// no need to check whether the created properties meet what we need in case
-					// of ordering or global ordering, because the only interesting properties we have
-					// are what we require
-					outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getOperator().getName()+")" ,c));
-				}
-			}
-		}
-		
-		// cost and prune the plans
-		for (PlanNode node : outputPlans) {
-			estimator.costOperator(node);
-		}
-		prunePlanAlternatives(outputPlans);
-
-		this.cachedPlans = outputPlans;
-		return outputPlans;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                                   Function Annotation Handling
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public SemanticProperties getSemanticProperties() {
-		return new EmptySemanticProperties();
-	}
-		
-	// --------------------------------------------------------------------------------------------
-	//                                     Miscellaneous
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void accept(Visitor<OptimizerNode> visitor) {
-		if (visitor.preVisit(this)) {
-			if (getPredecessorNode() != null) {
-				getPredecessorNode().accept(visitor);
-			} else {
-				throw new CompilerException();
-			}
-			visitor.postVisit(this);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
deleted file mode 100644
index e4b35b7..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.apache.flink.api.common.io.ReplicatingInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.GenericDataSourceBase.SplitDataProperties;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Visitor;
-
-/**
- * The optimizer's internal representation of a data source.
- */
-public class DataSourceNode extends OptimizerNode {
-	
-	private final boolean sequentialInput;
-
-	private final boolean replicatedInput;
-
-	private GlobalProperties gprops;
-
-	private LocalProperties lprops;
-
-	/**
-	 * Creates a new DataSourceNode for the given contract.
-	 * 
-	 * @param pactContract
-	 *        The data source contract object.
-	 */
-	public DataSourceNode(GenericDataSourceBase<?, ?> pactContract) {
-		super(pactContract);
-		
-		if (pactContract.getUserCodeWrapper().getUserCodeClass() == null) {
-			throw new IllegalArgumentException("Input format has not been set.");
-		}
-		
-		if (NonParallelInput.class.isAssignableFrom(pactContract.getUserCodeWrapper().getUserCodeClass())) {
-			setDegreeOfParallelism(1);
-			this.sequentialInput = true;
-		} else {
-			this.sequentialInput = false;
-		}
-
-		this.replicatedInput = ReplicatingInputFormat.class.isAssignableFrom(
-														pactContract.getUserCodeWrapper().getUserCodeClass());
-
-		this.gprops = new GlobalProperties();
-		this.lprops = new LocalProperties();
-
-		SplitDataProperties<?> splitProps = pactContract.getSplitDataProperties();
-
-		if(replicatedInput) {
-			this.gprops.setFullyReplicated();
-			this.lprops = new LocalProperties();
-		} else if (splitProps != null) {
-			// configure data properties of data source using split properties
-			setDataPropertiesFromSplitProperties(splitProps);
-		}
-
-	}
-
-	/**
-	 * Gets the contract object for this data source node.
-	 * 
-	 * @return The contract.
-	 */
-	@Override
-	public GenericDataSourceBase<?, ?> getOperator() {
-		return (GenericDataSourceBase<?, ?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "Data Source";
-	}
-
-	@Override
-	public void setDegreeOfParallelism(int degreeOfParallelism) {
-		// if unsplittable, parallelism remains at 1
-		if (!this.sequentialInput) {
-			super.setDegreeOfParallelism(degreeOfParallelism);
-		}
-	}
-
-	@Override
-	public List<DagConnection> getIncomingConnections() {
-		return Collections.<DagConnection>emptyList();
-	}
-
-	@Override
-	public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultDataExchangeMode) {}
-
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		// see, if we have a statistics object that can tell us a bit about the file
-		if (statistics != null) {
-			// instantiate the input format, as this is needed by the statistics 
-			InputFormat<?, ?> format;
-			String inFormatDescription = "<unknown>";
-			
-			try {
-				format = getOperator().getFormatWrapper().getUserCodeObject();
-				Configuration config = getOperator().getParameters();
-				format.configure(config);
-			}
-			catch (Throwable t) {
-				if (Optimizer.LOG.isWarnEnabled()) {
-					Optimizer.LOG.warn("Could not instantiate InputFormat to obtain statistics."
-						+ " Limited statistics will be available.", t);
-				}
-				return;
-			}
-			try {
-				inFormatDescription = format.toString();
-			}
-			catch (Throwable t) {
-				// we can ignore this error, as it only prevents us to use a cosmetic string
-			}
-			
-			// first of all, get the statistics from the cache
-			final String statisticsKey = getOperator().getStatisticsKey();
-			final BaseStatistics cachedStatistics = statistics.getBaseStatistics(statisticsKey);
-			
-			BaseStatistics bs = null;
-			try {
-				bs = format.getStatistics(cachedStatistics);
-			}
-			catch (Throwable t) {
-				if (Optimizer.LOG.isWarnEnabled()) {
-					Optimizer.LOG.warn("Error obtaining statistics from input format: " + t.getMessage(), t);
-				}
-			}
-			
-			if (bs != null) {
-				final long len = bs.getTotalInputSize();
-				if (len == BaseStatistics.SIZE_UNKNOWN) {
-					if (Optimizer.LOG.isInfoEnabled()) {
-						Optimizer.LOG.info("Compiler could not determine the size of input '" + inFormatDescription + "'. Using default estimates.");
-					}
-				}
-				else if (len >= 0) {
-					this.estimatedOutputSize = len;
-				}
-				
-				final long card = bs.getNumberOfRecords();
-				if (card != BaseStatistics.NUM_RECORDS_UNKNOWN) {
-					this.estimatedNumRecords = card;
-				}
-			}
-		}
-	}
-
-	@Override
-	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
-		// no children, so nothing to compute
-	}
-
-	@Override
-	public void computeUnclosedBranchStack() {
-		// because there are no inputs, there are no unclosed branches.
-		this.openBranches = Collections.emptyList();
-	}
-
-	@Override
-	public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
-		if (this.cachedPlans != null) {
-			return this.cachedPlans;
-		}
-
-		SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getOperator().getName()+")",
-				this.gprops, this.lprops);
-
-		if(!replicatedInput) {
-			candidate.updatePropertiesWithUniqueSets(getUniqueFields());
-
-			final Costs costs = new Costs();
-			if (FileInputFormat.class.isAssignableFrom(getOperator().getFormatWrapper().getUserCodeClass()) &&
-					this.estimatedOutputSize >= 0) {
-				estimator.addFileInputCost(this.estimatedOutputSize, costs);
-			}
-			candidate.setCosts(costs);
-		} else {
-			// replicated input
-			final Costs costs = new Costs();
-			InputFormat<?,?> inputFormat =
-					((ReplicatingInputFormat<?,?>) getOperator().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat();
-			if (FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) &&
-					this.estimatedOutputSize >= 0) {
-				estimator.addFileInputCost(this.estimatedOutputSize * this.getParallelism(), costs);
-			}
-			candidate.setCosts(costs);
-		}
-
-		// since there is only a single plan for the data-source, return a list with that element only
-		List<PlanNode> plans = new ArrayList<PlanNode>(1);
-		plans.add(candidate);
-
-		this.cachedPlans = plans;
-		return plans;
-	}
-
-	@Override
-	public SemanticProperties getSemanticProperties() {
-		return new EmptySemanticProperties();
-	}
-	
-	@Override
-	public void accept(Visitor<OptimizerNode> visitor) {
-		if (visitor.preVisit(this)) {
-			visitor.postVisit(this);
-		}
-	}
-
-	private void setDataPropertiesFromSplitProperties(SplitDataProperties splitProps) {
-
-		// set global properties
-		int[] partitionKeys = splitProps.getSplitPartitionKeys();
-		Partitioner<?> partitioner = splitProps.getSplitPartitioner();
-
-		if(partitionKeys != null && partitioner != null) {
-			this.gprops.setCustomPartitioned(new FieldList(partitionKeys), partitioner);
-		}
-		else if(partitionKeys != null) {
-			this.gprops.setAnyPartitioning(new FieldList(partitionKeys));
-		}
-		// set local properties
-		int[] groupingKeys = splitProps.getSplitGroupKeys();
-		Ordering ordering = splitProps.getSplitOrder();
-
-		// more than one split per source tasks possible.
-		// adapt split grouping and sorting
-		if(ordering != null) {
-
-			// sorting falls back to grouping because a source can read multiple,
-			// randomly assigned splits
-			groupingKeys = ordering.getFieldPositions();
-		}
-
-		if(groupingKeys != null && partitionKeys != null) {
-			// check if grouping is also valid across splits, i.e., whether grouping keys are
-			// valid superset of partition keys
-			boolean allFieldsIncluded = true;
-			for(int i : partitionKeys) {
-				boolean fieldIncluded = false;
-				for(int j : groupingKeys) {
-					if(i == j) {
-						fieldIncluded = true;
-						break;
-					}
-				}
-				if(!fieldIncluded) {
-					allFieldsIncluded = false;
-					break;
-				}
-			}
-			if (allFieldsIncluded) {
-				this.lprops = LocalProperties.forGrouping(new FieldList(groupingKeys));
-			} else {
-				this.lprops = new LocalProperties();
-			}
-
-		} else {
-			this.lprops = new LocalProperties();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
deleted file mode 100644
index 482951b..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-/**
- * Methods for operators / connections that provide estimated about data size and
- * characteristics.
- */
-public interface EstimateProvider {
-	
-	/**
-	 * Gets the estimated output size from this node.
-	 * 
-	 * @return The estimated output size.
-	 */
-	long getEstimatedOutputSize();
-
-	/**
-	 * Gets the estimated number of records in the output of this node.
-	 * 
-	 * @return The estimated number of records.
-	 */
-	long getEstimatedNumRecords();
-	
-	/**
-	 * Gets the estimated number of bytes per record.
-	 * 
-	 * @return The estimated number of bytes per record.
-	 */
-	float getEstimatedAvgWidthPerOutputRecord();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
deleted file mode 100644
index 118ddc8..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.base.FilterOperatorBase;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.FilterDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>FlatMap</i> operator node.
- */
-public class FilterNode extends SingleInputNode {
-	
-	private final List<OperatorDescriptorSingle> possibleProperties;
-	
-	public FilterNode(FilterOperatorBase<?, ?> operator) {
-		super(operator);
-		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new FilterDescriptor());
-	}
-
-	@Override
-	public FilterOperatorBase<?, ?> getOperator() {
-		return (FilterOperatorBase<?, ?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "Filter";
-	}
-
-	@Override
-	public SemanticProperties getSemanticProperties() {
-		return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
-	}
-
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return this.possibleProperties;
-	}
-
-	/**
-	 * Computes the estimates for the Filter operator. Since it applies a filter on the data we assume a cardinality
-	 * decrease. To give the system a hint at data decrease, we use a default magic number to indicate a 0.5 decrease. 
-	 */
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		this.estimatedNumRecords = (long) (getPredecessorNode().getEstimatedNumRecords() * 0.5);
-		this.estimatedOutputSize = (long) (getPredecessorNode().getEstimatedOutputSize() * 0.5);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
deleted file mode 100644
index f713d56..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.FlatMapDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>FlatMap</i> operator node.
- */
-public class FlatMapNode extends SingleInputNode {
-	
-	private final List<OperatorDescriptorSingle> possibleProperties;
-	
-	public FlatMapNode(FlatMapOperatorBase<?, ?, ?> operator) {
-		super(operator);
-		
-		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new FlatMapDescriptor());
-	}
-
-	@Override
-	public FlatMapOperatorBase<?, ?, ?> getOperator() {
-		return (FlatMapOperatorBase<?, ?, ?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "FlatMap";
-	}
-
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return this.possibleProperties;
-	}
-
-	/**
-	 * Computes the estimates for the FlatMap operator. Since it un-nests, we assume a cardinality
-	 * increase. To give the system a hint at data increase, we take a default magic number of a 5 times increase. 
-	 */
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords() * 5;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
deleted file mode 100644
index 564c0d3..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.AllGroupCombineProperties;
-import org.apache.flink.optimizer.operators.GroupCombineProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The optimizer representation of a <i>GroupCombineNode</i> operation.
- */
-public class GroupCombineNode extends SingleInputNode {
-
-	private final List<OperatorDescriptorSingle> possibleProperties;
-
-	/**
-	 * Creates a new optimizer node for the given operator.
-	 *
-	 * @param operator The reduce operation.
-	 */
-	public GroupCombineNode(GroupCombineOperatorBase<?, ?, ?> operator) {
-		super(operator);
-
-		if (this.keys == null) {
-			// case of a key-less reducer. force a parallelism of 1
-			setDegreeOfParallelism(1);
-		}
-
-		this.possibleProperties = initPossibleProperties();
-	}
-
-	private List<OperatorDescriptorSingle> initPossibleProperties() {
-
-		// check if we can work with a grouping (simple reducer), or if we need ordering because of a group order
-		Ordering groupOrder = getOperator().getGroupOrder();
-		if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
-			groupOrder = null;
-		}
-
-		OperatorDescriptorSingle props = (this.keys == null ?
-				new AllGroupCombineProperties() :
-				new GroupCombineProperties(this.keys, groupOrder));
-
-		return Collections.singletonList(props);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the operator represented by this optimizer node.
-	 *
-	 * @return The operator represented by this optimizer node.
-	 */
-	@Override
-	public GroupCombineOperatorBase<?, ?, ?> getOperator() {
-		return (GroupCombineOperatorBase<?, ?, ?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "GroupCombine";
-	}
-
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return this.possibleProperties;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Estimates
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		// no real estimates possible for a reducer.
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
deleted file mode 100644
index 77acae5..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.operators.AllGroupReduceProperties;
-import org.apache.flink.optimizer.operators.AllGroupWithPartialPreGroupProperties;
-import org.apache.flink.optimizer.operators.GroupReduceProperties;
-import org.apache.flink.optimizer.operators.GroupReduceWithCombineProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * The optimizer representation of a <i>GroupReduce</i> operation.
- */
-public class GroupReduceNode extends SingleInputNode {
-	
-	private final List<OperatorDescriptorSingle> possibleProperties;
-	
-	private GroupReduceNode combinerUtilityNode;
-	
-	/**
-	 * Creates a new optimizer node for the given operator.
-	 * 
-	 * @param operator The reduce operation.
-	 */
-	public GroupReduceNode(GroupReduceOperatorBase<?, ?, ?> operator) {
-		super(operator);
-		
-		if (this.keys == null) {
-			// case of a key-less reducer. force a parallelism of 1
-			setDegreeOfParallelism(1);
-		}
-		
-		this.possibleProperties = initPossibleProperties(operator.getCustomPartitioner());
-	}
-	
-	public GroupReduceNode(GroupReduceNode reducerToCopyForCombiner) {
-		super(reducerToCopyForCombiner);
-		
-		this.possibleProperties = Collections.emptyList();
-	}
-	
-	private List<OperatorDescriptorSingle> initPossibleProperties(Partitioner<?> customPartitioner) {
-		// see if an internal hint dictates the strategy to use
-		final Configuration conf = getOperator().getParameters();
-		final String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
-
-		final boolean useCombiner;
-		if (localStrategy != null) {
-			if (Optimizer.HINT_LOCAL_STRATEGY_SORT.equals(localStrategy)) {
-				useCombiner = false;
-			}
-			else if (Optimizer.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) {
-				if (!isCombineable()) {
-					Optimizer.LOG.warn("Strategy hint for GroupReduce '" + getOperator().getName() +
-						"' requires combinable reduce, but user function is not marked combinable.");
-				}
-				useCombiner = true;
-			} else {
-				throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy);
-			}
-		} else {
-			useCombiner = isCombineable();
-		}
-		
-		// check if we can work with a grouping (simple reducer), or if we need ordering because of a group order
-		Ordering groupOrder = null;
-		if (getOperator() instanceof GroupReduceOperatorBase) {
-			groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) getOperator()).getGroupOrder();
-			if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
-				groupOrder = null;
-			}
-		}
-		
-		OperatorDescriptorSingle props = useCombiner ?
-			(this.keys == null ? new AllGroupWithPartialPreGroupProperties() : new GroupReduceWithCombineProperties(this.keys, groupOrder, customPartitioner)) :
-			(this.keys == null ? new AllGroupReduceProperties() : new GroupReduceProperties(this.keys, groupOrder, customPartitioner));
-
-		return Collections.singletonList(props);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the operator represented by this optimizer node.
-	 * 
-	 * @return The operator represented by this optimizer node.
-	 */
-	@Override
-	public GroupReduceOperatorBase<?, ?, ?> getOperator() {
-		return (GroupReduceOperatorBase<?, ?, ?>) super.getOperator();
-	}
-
-	/**
-	 * Checks, whether a combiner function has been given for the function encapsulated
-	 * by this reduce contract.
-	 * 
-	 * @return True, if a combiner has been given, false otherwise.
-	 */
-	public boolean isCombineable() {
-		return getOperator().isCombinable();
-	}
-
-	@Override
-	public String getName() {
-		return "GroupReduce";
-	}
-	
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return this.possibleProperties;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Estimates
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		// no real estimates possible for a reducer.
-	}
-	
-	public GroupReduceNode getCombinerUtilityNode() {
-		if (this.combinerUtilityNode == null) {
-			this.combinerUtilityNode = new GroupReduceNode(this);
-			
-			// we conservatively assume the combiner returns the same data size as it consumes 
-			this.combinerUtilityNode.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
-			this.combinerUtilityNode.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
-		}
-		return this.combinerUtilityNode;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java
deleted file mode 100644
index 1fdae51..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.util.Visitor;
-
-final class InterestingPropertiesClearer implements Visitor<OptimizerNode> {
-	
-	static final InterestingPropertiesClearer INSTANCE = new InterestingPropertiesClearer();
-
-	@Override
-	public boolean preVisit(OptimizerNode visitable) {
-		if (visitable.getInterestingProperties() != null) {
-			visitable.clearInterestingProperties();
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public void postVisit(OptimizerNode visitable) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java
deleted file mode 100644
index 5d28043..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.util.Visitor;
-
-/**
- *
- */
-public interface IterationNode {
-	
-	void acceptForStepFunction(Visitor<OptimizerNode> visitor);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
deleted file mode 100644
index cbd58ca..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
-import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
-import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * The Optimizer representation of a join operator.
- */
-public class JoinNode extends TwoInputNode {
-	
-	private List<OperatorDescriptorDual> dataProperties;
-	
-	/**
-	 * Creates a new JoinNode for the given join operator.
-	 * 
-	 * @param joinOperatorBase The join operator object.
-	 */
-	public JoinNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
-		super(joinOperatorBase);
-		
-		this.dataProperties = getDataProperties(joinOperatorBase,
-				joinOperatorBase.getJoinHint(), joinOperatorBase.getCustomPartitioner());
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the contract object for this match node.
-	 * 
-	 * @return The contract.
-	 */
-	@Override
-	public JoinOperatorBase<?, ?, ?, ?> getOperator() {
-		return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "Join";
-	}
-
-	@Override
-	protected List<OperatorDescriptorDual> getPossibleProperties() {
-		return this.dataProperties;
-	}
-	
-	public void makeJoinWithSolutionSet(int solutionsetInputIndex) {
-		OperatorDescriptorDual op;
-		if (solutionsetInputIndex == 0) {
-			op = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
-		} else if (solutionsetInputIndex == 1) {
-			op = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
-		} else {
-			throw new IllegalArgumentException();
-		}
-		
-		this.dataProperties = Collections.singletonList(op);
-	}
-	
-	/**
-	 * The default estimates build on the principle of inclusion: The smaller input key domain is included in the larger
-	 * input key domain. We also assume that every key from the larger input has one join partner in the smaller input.
-	 * The result cardinality is hence the larger one.
-	 */
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
-		long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
-		this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : Math.max(card1, card2);
-		
-		if (this.estimatedNumRecords >= 0) {
-			float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
-			float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
-			float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
-			
-			if (width > 0) {
-				this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
-			}
-		}
-	}
-	
-	private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint,
-			Partitioner<?> customPartitioner)
-	{
-		// see if an internal hint dictates the strategy to use
-		Configuration conf = joinOperatorBase.getParameters();
-		String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
-
-		if (localStrategy != null) {
-			final AbstractJoinDescriptor fixedDriverStrat;
-			if (Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
-				Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
-				Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
-				Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
-			{
-				fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2);
-			}
-			else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
-				fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
-			}
-			else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
-				fixedDriverStrat = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
-			}
-			else {
-				throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy);
-			}
-			
-			if (customPartitioner != null) {
-				fixedDriverStrat.setCustomPartitioner(customPartitioner);
-			}
-			
-			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
-			list.add(fixedDriverStrat);
-			return list;
-		}
-		else {
-			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
-			
-			joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;
-			
-			switch (joinHint) {
-				case BROADCAST_HASH_FIRST:
-					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
-					break;
-				case BROADCAST_HASH_SECOND:
-					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
-					break;
-				case REPARTITION_HASH_FIRST:
-					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
-					break;
-				case REPARTITION_HASH_SECOND:
-					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
-					break;
-				case REPARTITION_SORT_MERGE:
-					list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true));
-					break;
-				case OPTIMIZER_CHOOSES:
-					list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2));
-					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
-					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
-					break;
-				default:
-					throw new CompilerException("Unrecognized join hint: " + joinHint);
-			}
-			
-			if (customPartitioner != null) {
-				for (OperatorDescriptorDual descr : list) {
-					((AbstractJoinDescriptor) descr).setCustomPartitioner(customPartitioner);
-				}
-			}
-			
-			return list;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
deleted file mode 100644
index 35def59..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.MapDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>Map</i> operator node.
- */
-public class MapNode extends SingleInputNode {
-	
-	private final List<OperatorDescriptorSingle> possibleProperties;
-	
-	/**
-	 * Creates a new MapNode for the given operator.
-	 * 
-	 * @param operator The map operator.
-	 */
-	public MapNode(SingleInputOperator<?, ?, ?> operator) {
-		super(operator);
-		
-		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapDescriptor());
-	}
-
-	@Override
-	public String getName() {
-		return "Map";
-	}
-
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return this.possibleProperties;
-	}
-
-	/**
-	 * Computes the estimates for the Map operator. 
-	 * We assume that by default, Map takes one value and transforms it into another value.
-	 * The cardinality consequently stays the same.
-	 */
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
deleted file mode 100644
index b287c33..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.MapPartitionDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>MapPartition</i> operator node.
- */
-public class MapPartitionNode extends SingleInputNode {
-	
-	private final List<OperatorDescriptorSingle> possibleProperties;
-	
-	/**
-	 * Creates a new MapNode for the given contract.
-	 * 
-	 * @param operator The map partition contract object.
-	 */
-	public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
-		super(operator);
-		
-		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
-	}
-
-	@Override
-	public String getName() {
-		return "MapPartition";
-	}
-
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return this.possibleProperties;
-	}
-
-	/**
-	 * Computes the estimates for the MapPartition operator.
-	 * We assume that by default, Map takes one value and transforms it into another value.
-	 * The cardinality consequently stays the same.
-	 */
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		// we really cannot make any estimates here
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
deleted file mode 100644
index de3cd22..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
-import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * The Optimizer representation of a join operator.
- */
-public class MatchNode extends TwoInputNode {
-	
-	private List<OperatorDescriptorDual> dataProperties;
-	
-	/**
-	 * Creates a new MatchNode for the given join operator.
-	 * 
-	 * @param joinOperatorBase The join operator object.
-	 */
-	public MatchNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
-		super(joinOperatorBase);
-		this.dataProperties = getDataProperties(joinOperatorBase, joinOperatorBase.getJoinHint());
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the contract object for this match node.
-	 * 
-	 * @return The contract.
-	 */
-	@Override
-	public JoinOperatorBase<?, ?, ?, ?> getOperator() {
-		return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "Join";
-	}
-
-	@Override
-	protected List<OperatorDescriptorDual> getPossibleProperties() {
-		return this.dataProperties;
-	}
-	
-	public void makeJoinWithSolutionSet(int solutionsetInputIndex) {
-		OperatorDescriptorDual op;
-		if (solutionsetInputIndex == 0) {
-			op = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
-		} else if (solutionsetInputIndex == 1) {
-			op = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
-		} else {
-			throw new IllegalArgumentException();
-		}
-		
-		this.dataProperties = Collections.singletonList(op);
-	}
-	
-	/**
-	 * The default estimates build on the principle of inclusion: The smaller input key domain is included in the larger
-	 * input key domain. We also assume that every key from the larger input has one join partner in the smaller input.
-	 * The result cardinality is hence the larger one.
-	 */
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
-		long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
-		this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : Math.max(card1, card2);
-		
-		if (this.estimatedNumRecords >= 0) {
-			float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
-			float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
-			float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
-			
-			if (width > 0) {
-				this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
-			}
-		}
-	}
-	
-	private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint) {
-		// see if an internal hint dictates the strategy to use
-		Configuration conf = joinOperatorBase.getParameters();
-		String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
-
-		if (localStrategy != null) {
-			final OperatorDescriptorDual fixedDriverStrat;
-			if (Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
-				Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
-				Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
-				Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
-			{
-				fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2);
-			} else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
-				fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
-			} else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
-				fixedDriverStrat = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
-			} else {
-				throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy);
-			}
-			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
-			list.add(fixedDriverStrat);
-			return list;
-		}
-		else {
-			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
-			
-			joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;
-			
-			switch (joinHint) {
-				case BROADCAST_HASH_FIRST:
-					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
-					break;
-				case BROADCAST_HASH_SECOND:
-					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
-					break;
-				case REPARTITION_HASH_FIRST:
-					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
-					break;
-				case REPARTITION_HASH_SECOND:
-					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
-					break;
-				case REPARTITION_SORT_MERGE:
-					list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true));
-					break;
-				case OPTIMIZER_CHOOSES:
-					list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2));
-					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
-					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
-					break;
-				default:
-					throw new CompilerException("Unrecognized join hint: " + joinHint);
-			}
-			
-			return list;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java
deleted file mode 100644
index 76467cf..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.NoOpDescriptor;
-
-/**
- * The optimizer's internal representation of a <i>No Operation</i> node.
- */
-public class NoOpNode extends UnaryOperatorNode {
-	
-	public NoOpNode() {
-		super("No Op", new FieldSet(), new NoOpDescriptor());
-	}
-	
-	public NoOpNode(String name) {
-		super(name, new FieldSet(), new NoOpDescriptor());
-	}
-	
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
-		this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
-	}
-}