You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/09 12:21:35 UTC

[08/12] git commit: Fix cost formulas for cached hash joins Fix estimation bug in channels Add tests for isolated cost formulas Add tests for channel class in plan candidates

Fix cost formulas for cached hash joins
Fix estimation bug in channels
Add tests for isolated cost formulas
Add tests for channel class in plan candidates


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/03c6160e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/03c6160e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/03c6160e

Branch: refs/heads/master
Commit: 03c6160e47bc2ab613a2734388aa22f83323e6a5
Parents: ce65fb6
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 9 03:03:21 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 9 03:09:48 2014 +0200

----------------------------------------------------------------------
 .../compiler/costs/CostEstimator.java           |  20 +-
 .../compiler/costs/DefaultCostEstimator.java    |  38 +-
 .../compiler/dag/OptimizerNode.java             |   8 +
 .../eu/stratosphere/compiler/plan/Channel.java  |   6 +-
 .../costs/DefaultCostEstimatorTest.java         | 425 +++++++++++++++++++
 .../stratosphere/compiler/plan/ChannelTest.java |  87 ++++
 6 files changed, 559 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03c6160e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
index 3f0c094..e45ed69 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java
@@ -40,13 +40,13 @@ public abstract class CostEstimator {
 
 	public abstract void addFileInputCost(long fileSizeInBytes, Costs costs);
 	
-	public abstract void addLocalSortCost(EstimateProvider estimates, long memorySize, Costs costs);
+	public abstract void addLocalSortCost(EstimateProvider estimates, Costs costs);
 	
-	public abstract void addLocalMergeCost(EstimateProvider estimates1, EstimateProvider estimates2, long memorySize, Costs costs, int costWeight);
+	public abstract void addLocalMergeCost(EstimateProvider estimates1, EstimateProvider estimates2, Costs costs, int costWeight);
 	
-	public abstract void addHybridHashCosts(EstimateProvider buildSide, EstimateProvider probeSide, long memorySize, Costs costs, int costWeight);
+	public abstract void addHybridHashCosts(EstimateProvider buildSide, EstimateProvider probeSide, Costs costs, int costWeight);
 	
-	public abstract void addCachedHybridHashCosts(EstimateProvider buildSide, EstimateProvider probeSide, long memorySize, Costs costs, int costWeight);
+	public abstract void addCachedHybridHashCosts(EstimateProvider buildSide, EstimateProvider probeSide, Costs costs, int costWeight);
 
 	public abstract void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long bufferSize, Costs costs, int costWeight);
 
@@ -116,7 +116,7 @@ public abstract class CostEstimator {
 				break;
 			case SORT:
 			case COMBININGSORT:
-				addLocalSortCost(channel, availableMemory, costs);
+				addLocalSortCost(channel, costs);
 				break;
 			default:
 				throw new CompilerException("Unsupported local strategy for input: " + channel.getLocalStrategy());
@@ -181,19 +181,19 @@ public abstract class CostEstimator {
 			
 			break;
 		case MERGE:
-			addLocalMergeCost(firstInput, secondInput, availableMemory, driverCosts, costWeight);
+			addLocalMergeCost(firstInput, secondInput, driverCosts, costWeight);
 			break;
 		case HYBRIDHASH_BUILD_FIRST:
-			addHybridHashCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
+			addHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
 			break;
 		case HYBRIDHASH_BUILD_SECOND:
-			addHybridHashCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
+			addHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
 			break;
 		case HYBRIDHASH_BUILD_FIRST_CACHED:
-			addCachedHybridHashCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
+			addCachedHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
 			break;
 		case HYBRIDHASH_BUILD_SECOND_CACHED:
-			addCachedHybridHashCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
+			addCachedHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
 			break;
 		case NESTEDLOOP_BLOCKED_OUTER_FIRST:
 			addBlockNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03c6160e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
index 425f93c..93eb66a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
@@ -96,6 +96,9 @@ public class DefaultCostEstimator extends CostEstimator {
 	@Override
 	public void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs) {
 		// if our replication factor is negative, we cannot calculate broadcast costs
+		if (replicationFactor <= 0) {
+			throw new IllegalArgumentException("The replication factor of must be larger than zero.");
+		}
 
 		if (replicationFactor > 0) {
 			// assumption: we need ship the whole data over the network to each node.
@@ -126,7 +129,7 @@ public class DefaultCostEstimator extends CostEstimator {
 	}
 	
 	@Override
-	public void addLocalSortCost(EstimateProvider estimates, long availableMemory, Costs costs) {
+	public void addLocalSortCost(EstimateProvider estimates, Costs costs) {
 		final long s = estimates.getEstimatedOutputSize();
 		// we assume a two phase merge sort, so all in all 2 I/O operations per block
 		if (s <= 0) {
@@ -141,25 +144,28 @@ public class DefaultCostEstimator extends CostEstimator {
 	}
 
 	@Override
-	public void addLocalMergeCost(EstimateProvider input1, EstimateProvider input2, long availableMemory, Costs costs, int costWeight) {
+	public void addLocalMergeCost(EstimateProvider input1, EstimateProvider input2, Costs costs, int costWeight) {
 		// costs nothing. the very rarely incurred cost for a spilling block nested loops join in the
-		// presence of massively re-occurring duplicate keys is ignored, because not accessible.
+		// presence of massively re-occurring duplicate keys is ignored, because cannot be assessed
 	}
 
 	@Override
-	public void addHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, long availableMemory, Costs costs, int costWeight) {
+	public void addHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, Costs costs, int costWeight) {
 		long bs = buildSideInput.getEstimatedOutputSize();
 		long ps = probeSideInput.getEstimatedOutputSize();
 		
 		if (bs > 0 && ps > 0) {
-			costs.addDiskCost(2*bs + ps);
-			costs.addCpuCost((long) ((2*bs + ps) * HASHING_CPU_FACTOR));
+			long overall = 2*bs + ps;
+			costs.addDiskCost(overall);
+			costs.addCpuCost((long) (overall * HASHING_CPU_FACTOR));
 		} else {
 			costs.setDiskCost(Costs.UNKNOWN);
 			costs.setCpuCost(Costs.UNKNOWN);
 		}
 		costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
-		costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * HASHING_CPU_FACTOR));
+		costs.addHeuristicCpuCost((long) (2 * HEURISTIC_COST_BASE * HASHING_CPU_FACTOR));
+		
+		// cost weight applies to everything
 		costs.multiplyWith(costWeight);
 	}
 	
@@ -168,20 +174,26 @@ public class DefaultCostEstimator extends CostEstimator {
 	 * We are assuming by default that half of the cached hash table fit into memory.
 	 */
 	@Override
-	public void addCachedHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, long availableMemory, Costs costs, int costWeight) {
+	public void addCachedHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, Costs costs, int costWeight) {
+		if (costWeight < 1) {
+			throw new IllegalArgumentException("The cost weight must be at least one.");
+		}
+		
 		long bs = buildSideInput.getEstimatedOutputSize();
 		long ps = probeSideInput.getEstimatedOutputSize();
 		
 		if (bs > 0 && ps > 0) {
-			long overallSize = 2*bs + ps;
-			costs.addDiskCost(overallSize / 2 + (overallSize / 2) * costWeight);
-			costs.addCpuCost((long) ((2*bs + ps) * HASHING_CPU_FACTOR));
+			long overall = 2*bs + costWeight*ps;
+			costs.addDiskCost(overall);
+			costs.addCpuCost((long) (overall * HASHING_CPU_FACTOR));
 		} else {
 			costs.setDiskCost(Costs.UNKNOWN);
 			costs.setCpuCost(Costs.UNKNOWN);
 		}
-		costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE * costWeight);
-		costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * HASHING_CPU_FACTOR * costWeight));
+		
+		// one time the build side plus cost-weight time the probe side
+		costs.addHeuristicDiskCost((1 + costWeight) * HEURISTIC_COST_BASE);
+		costs.addHeuristicCpuCost((long) ((1 + costWeight) * HEURISTIC_COST_BASE * HASHING_CPU_FACTOR));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03c6160e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
index ec9bd69..88592e8 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
@@ -491,6 +491,14 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 		return this.estimatedNumRecords;
 	}
 	
+	public void setEstimatedOutputSize(long estimatedOutputSize) {
+		this.estimatedOutputSize = estimatedOutputSize;
+	}
+
+	public void setEstimatedNumRecords(long estimatedNumRecords) {
+		this.estimatedNumRecords = estimatedNumRecords;
+	}
+	
 	public float getEstimatedAvgWidthPerOutputRecord() {
 		if (this.estimatedOutputSize > 0 && this.estimatedNumRecords > 0) {
 			return ((float) this.estimatedOutputSize) / this.estimatedNumRecords;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03c6160e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
index 6f9418f..c604a45 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java
@@ -317,12 +317,14 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 
 	@Override
 	public long getEstimatedOutputSize() {
-		return this.source.template.getEstimatedOutputSize() * this.replicationFactor;
+		long estimate = this.source.template.getEstimatedOutputSize();
+		return estimate < 0 ? estimate : estimate * this.replicationFactor;
 	}
 
 	@Override
 	public long getEstimatedNumRecords() {
-		return this.source.template.getEstimatedNumRecords() * this.replicationFactor;
+		long estimate =  this.source.template.getEstimatedNumRecords();
+		return estimate < 0 ? estimate : estimate * this.replicationFactor;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03c6160e/stratosphere-compiler/src/test/java/eu/stratosphere/compiler/costs/DefaultCostEstimatorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/compiler/costs/DefaultCostEstimatorTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/compiler/costs/DefaultCostEstimatorTest.java
new file mode 100644
index 0000000..3b85ff4
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/compiler/costs/DefaultCostEstimatorTest.java
@@ -0,0 +1,425 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.compiler.costs;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import eu.stratosphere.compiler.dag.EstimateProvider;
+
+/**
+ * Tests for the cost formulas in the {@link DefaultCostEstimator}. Most of the tests establish relative
+ * relationships.
+ */
+public class DefaultCostEstimatorTest {
+	
+	// estimates
+	
+	private static final long SMALL_DATA_SIZE = 10000;
+	private static final long SMALL_RECORD_COUNT = 100;
+	
+	private static final long MEDIUM_DATA_SIZE = 500000000L;
+	private static final long MEDIUM_RECORD_COUNT = 500000L;
+	
+	private static final long BIG_DATA_SIZE = 100000000000L;
+	private static final long BIG_RECORD_COUNT = 100000000L;
+	
+	private static final EstimateProvider UNKNOWN_ESTIMATES = new UnknownEstimates();
+	private static final EstimateProvider ZERO_ESTIMATES = new Estimates(0, 0);
+	private static final EstimateProvider SMALL_ESTIMATES = new Estimates(SMALL_DATA_SIZE, SMALL_RECORD_COUNT);
+	private static final EstimateProvider MEDIUM_ESTIMATES = new Estimates(MEDIUM_DATA_SIZE, MEDIUM_RECORD_COUNT);
+	private static final EstimateProvider BIG_ESTIMATES = new Estimates(BIG_DATA_SIZE, BIG_RECORD_COUNT);
+	
+	private final CostEstimator costEstimator = new DefaultCostEstimator();
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Test
+	public void testShipStrategiesIsolated() {
+		testShipStrategiesIsolated(UNKNOWN_ESTIMATES, 1);
+		testShipStrategiesIsolated(UNKNOWN_ESTIMATES, 10);
+		testShipStrategiesIsolated(ZERO_ESTIMATES, 1);
+		testShipStrategiesIsolated(ZERO_ESTIMATES, 10);
+		testShipStrategiesIsolated(SMALL_ESTIMATES, 1);
+		testShipStrategiesIsolated(SMALL_ESTIMATES, 10);
+		testShipStrategiesIsolated(BIG_ESTIMATES, 1);
+		testShipStrategiesIsolated(BIG_ESTIMATES, 10);
+	}
+	
+	private void testShipStrategiesIsolated(EstimateProvider estimates, int targetParallelism) {
+		Costs random = new Costs();
+		costEstimator.addRandomPartitioningCost(estimates, random);
+		
+		Costs hash = new Costs();
+		costEstimator.addHashPartitioningCost(estimates, hash);
+		
+		Costs range = new Costs();
+		costEstimator.addRangePartitionCost(estimates, range);
+		
+		Costs broadcast = new Costs();
+		costEstimator.addBroadcastCost(estimates, targetParallelism, broadcast);
+		
+		int randomVsHash = random.compareTo(hash);
+		int hashVsRange = hash.compareTo(range);
+		int hashVsBroadcast = hash.compareTo(broadcast);
+		int rangeVsBroadcast = range.compareTo(broadcast);
+
+		// repartition random is at most as expensive as hash partitioning
+		assertTrue(randomVsHash <= 0);
+		
+		// range partitioning is always more expensive than hash partitioning
+		assertTrue(hashVsRange < 0);
+		
+		// broadcasting is always more expensive than hash partitioning
+		if (targetParallelism > 1) {
+			assertTrue(hashVsBroadcast < 0);
+		} else {
+			assertTrue(hashVsBroadcast <= 0);
+		}
+		
+		// range partitioning is not more expensive than broadcasting
+		if (targetParallelism > 1) {
+			assertTrue(rangeVsBroadcast < 0);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Test
+	public void testShipStrategyCombinationsPlain() {
+		Costs hashBothSmall = new Costs();
+		Costs hashSmallAndLarge = new Costs();
+		Costs hashBothLarge = new Costs();
+		
+		Costs hashSmallBcLarge10 = new Costs();
+		Costs hashLargeBcSmall10 = new Costs();
+		
+		Costs hashSmallBcLarge1000 = new Costs();
+		Costs hashLargeBcSmall1000 = new Costs();
+		
+		Costs forwardSmallBcLarge10 = new Costs();
+		Costs forwardLargeBcSmall10 = new Costs();
+		
+		Costs forwardSmallBcLarge1000 = new Costs();
+		Costs forwardLargeBcSmall1000 = new Costs();
+		
+		costEstimator.addHashPartitioningCost(MEDIUM_ESTIMATES, hashBothSmall);
+		costEstimator.addHashPartitioningCost(MEDIUM_ESTIMATES, hashBothSmall);
+		
+		costEstimator.addHashPartitioningCost(MEDIUM_ESTIMATES, hashSmallAndLarge);
+		costEstimator.addHashPartitioningCost(BIG_ESTIMATES, hashSmallAndLarge);
+		
+		costEstimator.addHashPartitioningCost(BIG_ESTIMATES, hashBothLarge);
+		costEstimator.addHashPartitioningCost(BIG_ESTIMATES, hashBothLarge);
+		
+		costEstimator.addHashPartitioningCost(MEDIUM_ESTIMATES, hashSmallBcLarge10);
+		costEstimator.addBroadcastCost(BIG_ESTIMATES, 10, hashSmallBcLarge10);
+		
+		costEstimator.addHashPartitioningCost(BIG_ESTIMATES, hashLargeBcSmall10);
+		costEstimator.addBroadcastCost(MEDIUM_ESTIMATES, 10, hashLargeBcSmall10);
+		
+		costEstimator.addHashPartitioningCost(MEDIUM_ESTIMATES, hashSmallBcLarge1000);
+		costEstimator.addBroadcastCost(BIG_ESTIMATES, 1000, hashSmallBcLarge1000);
+		
+		costEstimator.addHashPartitioningCost(BIG_ESTIMATES, hashLargeBcSmall1000);
+		costEstimator.addBroadcastCost(MEDIUM_ESTIMATES, 1000, hashLargeBcSmall1000);
+		
+		costEstimator.addBroadcastCost(BIG_ESTIMATES, 10, forwardSmallBcLarge10);
+		
+		costEstimator.addBroadcastCost(MEDIUM_ESTIMATES, 10, forwardLargeBcSmall10);
+		
+		costEstimator.addBroadcastCost(BIG_ESTIMATES, 1000, forwardSmallBcLarge1000);
+		
+		costEstimator.addBroadcastCost(MEDIUM_ESTIMATES, 1000, forwardLargeBcSmall1000);
+		
+		// hash cost is roughly monotonous
+		assertTrue(hashBothSmall.compareTo(hashSmallAndLarge) < 0);
+		assertTrue(hashSmallAndLarge.compareTo(hashBothLarge) < 0);
+		
+		// broadcast the smaller is better
+		assertTrue(hashLargeBcSmall10.compareTo(hashSmallBcLarge10) < 0);
+		assertTrue(forwardLargeBcSmall10.compareTo(forwardSmallBcLarge10) < 0);
+		assertTrue(hashLargeBcSmall1000.compareTo(hashSmallBcLarge1000) < 0);
+		assertTrue(forwardLargeBcSmall1000.compareTo(forwardSmallBcLarge1000) < 0);
+		
+		// broadcasting small and forwarding large is better than partition both, given size difference
+		assertTrue(forwardLargeBcSmall10.compareTo(hashSmallAndLarge) < 0);
+		
+		// broadcasting too far is expensive again
+		assertTrue(forwardLargeBcSmall1000.compareTo(hashSmallAndLarge) > 0);
+		
+		// assert weight is respected
+		assertTrue(hashSmallBcLarge10.compareTo(hashSmallBcLarge1000) < 0);
+		assertTrue(hashLargeBcSmall10.compareTo(hashLargeBcSmall1000) < 0);
+		assertTrue(forwardSmallBcLarge10.compareTo(forwardSmallBcLarge1000) < 0);
+		assertTrue(forwardLargeBcSmall10.compareTo(forwardLargeBcSmall1000) < 0);
+		
+		// forward versus hash
+		assertTrue(forwardSmallBcLarge10.compareTo(hashSmallBcLarge10) < 0);
+		assertTrue(forwardSmallBcLarge1000.compareTo(hashSmallBcLarge1000) < 0);
+		assertTrue(forwardLargeBcSmall10.compareTo(hashLargeBcSmall10) < 0);
+		assertTrue(forwardLargeBcSmall1000.compareTo(hashLargeBcSmall1000) < 0);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Test
+	public void testShipStrategyCombinationsWithUnknowns() {
+		testShipStrategyCombinationsWithUnknowns(UNKNOWN_ESTIMATES);
+		testShipStrategyCombinationsWithUnknowns(ZERO_ESTIMATES);
+		testShipStrategyCombinationsWithUnknowns(SMALL_ESTIMATES);
+		testShipStrategyCombinationsWithUnknowns(MEDIUM_ESTIMATES);
+		testShipStrategyCombinationsWithUnknowns(BIG_ESTIMATES);
+	}
+	
+	private void testShipStrategyCombinationsWithUnknowns(EstimateProvider knownEstimates) {
+		Costs hashBoth = new Costs();
+		Costs bcKnown10 = new Costs();
+		Costs bcUnknown10 = new Costs();
+		Costs bcKnown1000 = new Costs();
+		Costs bcUnknown1000 = new Costs();
+		
+		costEstimator.addHashPartitioningCost(knownEstimates, hashBoth);
+		costEstimator.addHashPartitioningCost(UNKNOWN_ESTIMATES, hashBoth);
+		
+		costEstimator.addBroadcastCost(knownEstimates, 10, bcKnown10);
+		
+		costEstimator.addBroadcastCost(UNKNOWN_ESTIMATES, 10, bcUnknown10);
+		
+		costEstimator.addBroadcastCost(knownEstimates, 1000, bcKnown1000);
+		
+		costEstimator.addBroadcastCost(UNKNOWN_ESTIMATES, 1000, bcUnknown1000);
+		
+		// if we do not know one of them, hashing both should be cheaper than anything
+		assertTrue(hashBoth.compareTo(bcKnown10) < 0);
+		assertTrue(hashBoth.compareTo(bcUnknown10) < 0);
+		assertTrue(hashBoth.compareTo(bcKnown1000) < 0);
+		assertTrue(hashBoth.compareTo(bcUnknown1000) < 0);
+		
+		// there should be no bias in broadcasting a known or unknown size input
+		assertTrue(bcKnown10.compareTo(bcUnknown10) == 0);
+		assertTrue(bcKnown1000.compareTo(bcUnknown1000) == 0);
+		
+		// replication factor does matter
+		assertTrue(bcKnown10.compareTo(bcKnown1000) < 0);
+		assertTrue(bcUnknown10.compareTo(bcUnknown1000) < 0);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Test
+	public void testJoinCostFormulasPlain() {
+		
+		// hash join costs
+		
+		Costs hashBothSmall = new Costs();
+		Costs hashBothLarge = new Costs();
+		Costs hashSmallBuild = new Costs();
+		Costs hashLargeBuild = new Costs();
+		
+		costEstimator.addHybridHashCosts(SMALL_ESTIMATES, BIG_ESTIMATES, hashSmallBuild, 1);
+		costEstimator.addHybridHashCosts(BIG_ESTIMATES, SMALL_ESTIMATES, hashLargeBuild, 1);
+		costEstimator.addHybridHashCosts(SMALL_ESTIMATES, SMALL_ESTIMATES, hashBothSmall, 1);
+		costEstimator.addHybridHashCosts(BIG_ESTIMATES, BIG_ESTIMATES, hashBothLarge, 1);
+
+		assertTrue(hashBothSmall.compareTo(hashSmallBuild) < 0);
+		assertTrue(hashSmallBuild.compareTo(hashLargeBuild) < 0);
+		assertTrue(hashLargeBuild.compareTo(hashBothLarge) < 0);
+		
+		// merge join costs
+		
+		Costs mergeBothSmall = new Costs();
+		Costs mergeBothLarge = new Costs();
+		Costs mergeSmallFirst = new Costs();
+		Costs mergeSmallSecond = new Costs();
+		
+		costEstimator.addLocalSortCost(SMALL_ESTIMATES, mergeSmallFirst);
+		costEstimator.addLocalSortCost(BIG_ESTIMATES, mergeSmallFirst);
+		costEstimator.addLocalMergeCost(SMALL_ESTIMATES, BIG_ESTIMATES, mergeSmallFirst, 1);
+		
+		costEstimator.addLocalSortCost(BIG_ESTIMATES, mergeSmallSecond);
+		costEstimator.addLocalSortCost(SMALL_ESTIMATES, mergeSmallSecond);
+		costEstimator.addLocalMergeCost(BIG_ESTIMATES, SMALL_ESTIMATES, mergeSmallSecond, 1);
+		
+		costEstimator.addLocalSortCost(SMALL_ESTIMATES, mergeBothSmall);
+		costEstimator.addLocalSortCost(SMALL_ESTIMATES, mergeBothSmall);
+		costEstimator.addLocalMergeCost(SMALL_ESTIMATES, SMALL_ESTIMATES, mergeBothSmall, 1);
+		
+		costEstimator.addLocalSortCost(BIG_ESTIMATES, mergeBothLarge);
+		costEstimator.addLocalSortCost(BIG_ESTIMATES, mergeBothLarge);
+		costEstimator.addLocalMergeCost(BIG_ESTIMATES, BIG_ESTIMATES, mergeBothLarge, 1);
+		
+		
+		assertTrue(mergeBothSmall.compareTo(mergeSmallFirst) < 0);
+		assertTrue(mergeBothSmall.compareTo(mergeSmallSecond) < 0);
+		assertTrue(mergeSmallFirst.compareTo(mergeSmallSecond) == 0);
+		assertTrue(mergeSmallFirst.compareTo(mergeBothLarge) < 0);
+		assertTrue(mergeSmallSecond.compareTo(mergeBothLarge) < 0);
+		
+		// compare merge join and hash join costs
+		
+		assertTrue(hashBothSmall.compareTo(mergeBothSmall) < 0);
+		assertTrue(hashBothLarge.compareTo(mergeBothLarge) < 0);
+		assertTrue(hashSmallBuild.compareTo(mergeSmallFirst) < 0);
+		assertTrue(hashSmallBuild.compareTo(mergeSmallSecond) < 0);
+		assertTrue(hashLargeBuild.compareTo(mergeSmallFirst) < 0);
+		assertTrue(hashLargeBuild.compareTo(mergeSmallSecond) < 0);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Test
+	public void testJoinCostFormulasWithWeights() {
+		testJoinCostFormulasWithWeights(UNKNOWN_ESTIMATES, SMALL_ESTIMATES);
+		testJoinCostFormulasWithWeights(SMALL_ESTIMATES, UNKNOWN_ESTIMATES);
+		testJoinCostFormulasWithWeights(UNKNOWN_ESTIMATES, MEDIUM_ESTIMATES);
+		testJoinCostFormulasWithWeights(MEDIUM_ESTIMATES, UNKNOWN_ESTIMATES);
+		testJoinCostFormulasWithWeights(BIG_ESTIMATES, MEDIUM_ESTIMATES);
+		testJoinCostFormulasWithWeights(MEDIUM_ESTIMATES, BIG_ESTIMATES);
+	}
+	
+	private void testJoinCostFormulasWithWeights(EstimateProvider e1, EstimateProvider e2) {
+		Costs hf1 = new Costs();
+		Costs hf5 = new Costs();
+		Costs hs1 = new Costs();
+		Costs hs5 = new Costs();
+		Costs mm1 = new Costs();
+		Costs mm5 = new Costs();
+		
+		costEstimator.addHybridHashCosts(e1, e2, hf1, 1);
+		costEstimator.addHybridHashCosts(e1, e2, hf5, 5);
+		costEstimator.addHybridHashCosts(e2, e1, hs1, 1);
+		costEstimator.addHybridHashCosts(e2, e1, hs5, 5);
+		
+		costEstimator.addLocalSortCost(e1, mm1);
+		costEstimator.addLocalSortCost(e2, mm1);
+		costEstimator.addLocalMergeCost(e1, e2, mm1, 1);
+		
+		costEstimator.addLocalSortCost(e1, mm5);
+		costEstimator.addLocalSortCost(e2, mm5);
+		mm5.multiplyWith(5);
+		costEstimator.addLocalMergeCost(e1, e2, mm5, 5);
+		
+		// weight 1 versus weight 5
+		assertTrue(hf1.compareTo(hf5) < 0);
+		assertTrue(hs1.compareTo(hs5) < 0);
+		assertTrue(mm1.compareTo(mm5) < 0);
+		
+		// hash versus merge
+		assertTrue(hf1.compareTo(mm1) < 0);
+		assertTrue(hs1.compareTo(mm1) < 0);
+		assertTrue(hf5.compareTo(mm5) < 0);
+		assertTrue(hs5.compareTo(mm5) < 0);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Test
+	public void testHashJoinCostFormulasWithCaches() {
+		
+		Costs hashBothUnknown10 = new Costs();
+		Costs hashBothUnknownCached10 = new Costs();
+		
+		Costs hashBothSmall10 = new Costs();
+		Costs hashBothSmallCached10 = new Costs();
+		
+		Costs hashSmallLarge10 = new Costs();
+		Costs hashSmallLargeCached10 = new Costs();
+		
+		Costs hashLargeSmall10 = new Costs();
+		Costs hashLargeSmallCached10 = new Costs();
+		
+		Costs hashLargeSmall1 = new Costs();
+		Costs hashLargeSmallCached1 = new Costs();
+		
+		costEstimator.addHybridHashCosts(UNKNOWN_ESTIMATES, UNKNOWN_ESTIMATES, hashBothUnknown10, 10);
+		costEstimator.addCachedHybridHashCosts(UNKNOWN_ESTIMATES, UNKNOWN_ESTIMATES, hashBothUnknownCached10, 10);
+		
+		costEstimator.addHybridHashCosts(MEDIUM_ESTIMATES, MEDIUM_ESTIMATES, hashBothSmall10, 10);
+		costEstimator.addCachedHybridHashCosts(MEDIUM_ESTIMATES, MEDIUM_ESTIMATES, hashBothSmallCached10, 10);
+		
+		costEstimator.addHybridHashCosts(MEDIUM_ESTIMATES, BIG_ESTIMATES, hashSmallLarge10, 10);
+		costEstimator.addCachedHybridHashCosts(MEDIUM_ESTIMATES, BIG_ESTIMATES, hashSmallLargeCached10, 10);
+		
+		costEstimator.addHybridHashCosts(BIG_ESTIMATES, MEDIUM_ESTIMATES, hashLargeSmall10, 10);
+		costEstimator.addCachedHybridHashCosts(BIG_ESTIMATES, MEDIUM_ESTIMATES, hashLargeSmallCached10, 10);
+		
+		costEstimator.addHybridHashCosts(BIG_ESTIMATES, MEDIUM_ESTIMATES, hashLargeSmall1, 1);
+		costEstimator.addCachedHybridHashCosts(BIG_ESTIMATES, MEDIUM_ESTIMATES, hashLargeSmallCached1, 1);
+		
+		// cached variant is always cheaper
+		assertTrue(hashBothUnknown10.compareTo(hashBothUnknownCached10) > 0);
+		assertTrue(hashBothSmall10.compareTo(hashBothSmallCached10) > 0);
+		assertTrue(hashSmallLarge10.compareTo(hashSmallLargeCached10) > 0);
+		assertTrue(hashLargeSmall10.compareTo(hashLargeSmallCached10) > 0);
+		
+		// caching the large side is better, because then the small one is the one with additional I/O
+		assertTrue(hashLargeSmallCached10.compareTo(hashSmallLargeCached10) < 0);
+		
+		// a weight of one makes the caching the same as the non-cached variant
+		assertTrue(hashLargeSmall1.compareTo(hashLargeSmallCached1) == 0);
+	}
+	
+	
+	// --------------------------------------------------------------------------------------------
+	//  Estimate providers
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class UnknownEstimates implements EstimateProvider {
+
+		@Override
+		public long getEstimatedOutputSize() { return -1; }
+
+		@Override
+		public long getEstimatedNumRecords() { return -1; }
+
+		@Override
+		public float getEstimatedAvgWidthPerOutputRecord() { return -1.0f; }
+	}
+	
+	private static final class Estimates implements EstimateProvider {
+		
+		private final long size;
+		private final long records;
+		private final float width;
+		
+		public Estimates(long size, long records) {
+			this(size, records, -1.0f);
+		}
+		
+		public Estimates(long size, long records, float width) {
+			this.size = size;
+			this.records = records;
+			this.width = width;
+		}
+
+		@Override
+		public long getEstimatedOutputSize() {
+			return this.size;
+		}
+
+		@Override
+		public long getEstimatedNumRecords() {
+			return this.records;
+		}
+
+		@Override
+		public float getEstimatedAvgWidthPerOutputRecord() {
+			return this.width;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/03c6160e/stratosphere-compiler/src/test/java/eu/stratosphere/compiler/plan/ChannelTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/compiler/plan/ChannelTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/compiler/plan/ChannelTest.java
new file mode 100644
index 0000000..f3af5d7
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/compiler/plan/ChannelTest.java
@@ -0,0 +1,87 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.compiler.plan;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import eu.stratosphere.api.common.operators.OperatorInformation;
+import eu.stratosphere.api.common.operators.base.GenericDataSourceBase;
+import eu.stratosphere.api.java.io.TextInputFormat;
+import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
+import eu.stratosphere.compiler.dag.DataSourceNode;
+import eu.stratosphere.core.fs.Path;
+
+public class ChannelTest {
+	
+	@Test
+	public void testGetEstimatesNoReplicationFactor() {
+		final long NUM_RECORD = 1001;
+		final long SIZE = 467131;
+		
+		DataSourceNode source = getSourceNode();
+		SourcePlanNode planNode = new SourcePlanNode(source, "test node");
+		Channel channel = new Channel(planNode);
+
+		// no estimates here
+		Assert.assertEquals(-1, channel.getEstimatedOutputSize());
+		Assert.assertEquals(-1, channel.getEstimatedNumRecords());
+		
+		// set estimates
+		source.setEstimatedNumRecords(NUM_RECORD);
+		source.setEstimatedOutputSize(SIZE);
+		Assert.assertEquals(SIZE, channel.getEstimatedOutputSize());
+		Assert.assertEquals(NUM_RECORD, channel.getEstimatedNumRecords());
+	}
+	
+	@Test
+	public void testGetEstimatesWithReplicationFactor() {
+		final long NUM_RECORD = 1001;
+		final long SIZE = 467131;
+		
+		final int REPLICATION = 23;
+		
+		DataSourceNode source = getSourceNode();
+		SourcePlanNode planNode = new SourcePlanNode(source, "test node");
+		Channel channel = new Channel(planNode);
+		channel.setReplicationFactor(REPLICATION);
+
+		// no estimates here
+		Assert.assertEquals(-1, channel.getEstimatedOutputSize());
+		Assert.assertEquals(-1, channel.getEstimatedNumRecords());
+		
+		// set estimates
+		source.setEstimatedNumRecords(NUM_RECORD);
+		source.setEstimatedOutputSize(SIZE);
+		Assert.assertEquals(SIZE * REPLICATION, channel.getEstimatedOutputSize());
+		Assert.assertEquals(NUM_RECORD * REPLICATION, channel.getEstimatedNumRecords());
+	}
+	
+	
+//	private static final OptimizerNode getSingleInputNode() {
+//		return new MapNode(new MapOperatorBase<String, String, GenericMap<String,String>>(
+//				new IdentityMapper<String>(),
+//				new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+//				"map"));
+//	}
+	
+	private static final DataSourceNode getSourceNode() {
+		return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(
+				new TextInputFormat(new Path("/ignored")), 
+				new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO),
+				"source"));
+	}
+}