You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:29 UTC

[50/53] [abbrv] flink git commit: [FLINK-441] [optimizer] Rename Pact* and Nephele* classes

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
index ced0e83..6946641 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.dataproperties;
 
 import java.util.HashSet;
@@ -29,13 +28,16 @@ import org.apache.flink.optimizer.dag.SingleInputNode;
 import org.apache.flink.optimizer.dag.TwoInputNode;
 
 /**
- * The interesting properties that a node in the optimizer plan hands to its predecessors. It has the
- * purpose to tell the preceding nodes, which data properties might have the advantage, because they would
- * let the node fulfill its pact cheaper. More on optimization with interesting properties can be found
- * in the works on the volcano- and cascades optimizer framework.
+ * Interesting properties are propagated from parent operators to child operators. They tell the child
+ * what data properties would help the parent in operating in a cheaper fashion. A reduce operator, for
+ * example, tells its child that partitioned data would help. If the child is a join operator, it can use
+ * that knowledge to favor strategies that leave the data in a partitioned form.
+ *
+ * More on optimization with interesting properties can be found in the works on
+ * the volcano- and cascades optimizer framework.
  */
-public class InterestingProperties implements Cloneable
-{
+public class InterestingProperties implements Cloneable  {
+
 	private Set<RequestedGlobalProperties> globalProps; // the global properties, i.e. properties across partitions
 
 	private Set<RequestedLocalProperties> localProps; // the local properties, i.e. properties within partitions
@@ -91,8 +93,7 @@ public class InterestingProperties implements Cloneable
 		return this.globalProps;
 	}
 
-	public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input)
-	{
+	public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) {
 		InterestingProperties iProps = new InterestingProperties();
 		SemanticProperties props;
 		if (node instanceof SingleInputNode || node instanceof TwoInputNode) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
index 0c3ea12..e0231aa 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * This class represents local properties of the data. A local property is a property that exists
- * within the data of a single partition.
+ * within the data of a single partition, such as sort order, or data grouping.
  */
 public class LocalProperties implements Cloneable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
index 674cdb8..5e06dd3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
@@ -19,42 +19,45 @@
 package org.apache.flink.optimizer.dataproperties;
 
 /**
- * An enumeration tracking the different types of sharding strategies.
+ * An enumeration of the the different types of distributing data across partitions or
+ * parallel workers.
  */
 public enum PartitioningProperty {
 
 	/**
-	 * Any data distribution, i.e., random partitioning or full replication.
+	 * Any possible way of data distribution, including random partitioning and full replication.
 	 */
 	ANY_DISTRIBUTION,
 
 	/**
-	 * Constant indicating no particular partitioning (i.e. random) data distribution.
+	 * A random disjunct (non-replicated) data distribution, where each datum is contained in one partition only.
+	 * This is for example the result of parallel scans of data in a file system like HDFS,
+	 * or the result of a round-robin data distribution.
 	 */
 	RANDOM_PARTITIONED,
 
 	/**
-	 * Constant indicating a hash partitioning.
+	 * A hash partitioning on a certain key.
 	 */
 	HASH_PARTITIONED,
 
 	/**
-	 * Constant indicating a range partitioning.
+	 * A range partitioning on a certain key.
 	 */
 	RANGE_PARTITIONED,
 
 	/**
-	 * Constant indicating any not further specified disjunct partitioning.
+	 * A not further specified partitioning on a key (hash-, or range partitioning, or some other scheme even).
 	 */
 	ANY_PARTITIONING,
 	
 	/**
-	 * Constant indicating full replication of the data to each parallel instance.
+	 *Full replication of the data to each parallel instance.
 	 */
 	FULL_REPLICATION,
 
 	/**
-	 * Constant indicating a forced even re-balancing.
+	 * A forced even re-balancing. All partitions are guaranteed to have almost the same number of records.
 	 */
 	FORCED_REBALANCED,
 	
@@ -95,10 +98,10 @@ public enum PartitioningProperty {
 
 	/**
 	 * Checks, if this property represents a partitioning that is computable.
-	 * Computable partitionings can be recreated through an algorithm. If two sets of data are to
+	 * A computable partitioning can be recreated through an algorithm. If two sets of data are to
 	 * be co-partitioned, it is crucial, that the partitioning schemes are computable.
 	 * <p>
-	 * Examples for computable partitioning schemes are hash- or range-partitionings. An example for a non-computable
+	 * Examples for computable partitioning schemes are hash- or range-partitioning. An example for a non-computable
 	 * partitioning is the implicit partitioning that exists though a globally unique key.
 	 * 
 	 * @return True, if this enum constant is a re-computable partitioning.

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java
index 102a0f0..a9d14e8 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java
@@ -29,14 +29,16 @@ import org.apache.flink.optimizer.util.Utils;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 
 /**
- * This class represents local properties of the data. A local property is a property that exists
- * within the data of a single partition.
+ * This class represents the local properties of the data that are requested by an operator.
+ * Local properties are the properties within one partition.
+ * Operators request the local properties they need for correct execution. Here are some example local
+ * properties requested by certain operators:
+ * <ul>
+ *     <li>"groupBy/reduce" will request the data to be grouped on the key fields.</li>
+ *     <li>A sort-merge join will request the data from each input to be sorted on the respective join key.</li>
+ * </ul>
  */
 public class RequestedLocalProperties implements Cloneable {
-
-	public static final RequestedLocalProperties DEFAULT_PROPERTIES = null;
-	
-	// --------------------------------------------------------------------------------------------
 	
 	private Ordering ordering;			// order inside a partition, null if not ordered
 
@@ -205,9 +207,9 @@ public class RequestedLocalProperties implements Cloneable {
 	}
 	
 	/**
-	 * Parameterizes the local strategy fields of a channel such that the channel produces the desired local properties.
+	 * Parametrizes the local strategy fields of a channel such that the channel produces the desired local properties.
 	 * 
-	 * @param channel The channel to parameterize.
+	 * @param channel The channel to parametrize.
 	 */
 	public void parameterizeChannel(Channel channel) {
 		LocalProperties current = channel.getLocalProperties();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java
index eaf11ac..9ee56b0 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java
@@ -40,7 +40,7 @@ public final class AllGroupCombineProperties extends OperatorDescriptorSingle {
 
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		return new SingleInputPlanNode(node, "GroupCombine ("+node.getPactContract().getName()+")", in, DriverStrategy.ALL_GROUP_COMBINE);
+		return new SingleInputPlanNode(node, "GroupCombine ("+node.getOperator().getName()+")", in, DriverStrategy.ALL_GROUP_COMBINE);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java
index 55d6fbb..9efd8c7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java
@@ -40,7 +40,7 @@ public final class AllGroupReduceProperties extends OperatorDescriptorSingle {
 
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")", in, DriverStrategy.ALL_GROUP_REDUCE);
+		return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")", in, DriverStrategy.ALL_GROUP_REDUCE);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
index 7d07e7d..b3c083a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
@@ -46,7 +46,7 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
 		if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
 			// locally connected, directly instantiate
-			return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")",
+			return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")",
 											in, DriverStrategy.ALL_GROUP_REDUCE);
 		} else {
 			// non forward case.plug in a combiner
@@ -55,10 +55,10 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
 			
 			// create an input node for combine with same DOP as input node
 			GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
-			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+			combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
 
 			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
-					"Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_REDUCE_COMBINE);
+					"Combine ("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_REDUCE_COMBINE);
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
 			
@@ -67,7 +67,7 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
 										in.getShipStrategySortOrder(), in.getDataExchangeMode());
 
 			toReducer.setLocalStrategy(in.getLocalStrategy(), in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
-			return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")",
+			return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")",
 											toReducer, DriverStrategy.ALL_GROUP_REDUCE);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
index 4f6a4fd..a172a60 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
@@ -45,7 +45,7 @@ public final class AllReduceProperties extends OperatorDescriptorSingle {
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
 		if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
 			// locally connected, directly instantiate
-			return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")",
+			return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")",
 											in, DriverStrategy.ALL_REDUCE);
 		} else {
 			// non forward case.plug in a combiner
@@ -54,10 +54,10 @@ public final class AllReduceProperties extends OperatorDescriptorSingle {
 			
 			// create an input node for combine with same DOP as input node
 			ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
-			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+			combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
 
 			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
-					"Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE);
+					"Combine ("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE);
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
 			
@@ -67,7 +67,7 @@ public final class AllReduceProperties extends OperatorDescriptorSingle {
 			toReducer.setLocalStrategy(in.getLocalStrategy(), in.getLocalStrategyKeys(),
 										in.getLocalStrategySortOrder());
 
-			return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")",
+			return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")",
 											toReducer, DriverStrategy.ALL_REDUCE);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
index c46ce56..f48e297 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
@@ -93,7 +93,7 @@ public abstract class CartesianProductDescriptor extends OperatorDescriptorDual
 	
 	@Override
 	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
-		return new DualInputPlanNode(node, "Cross("+node.getPactContract().getName()+")", in1, in2, getStrategy());
+		return new DualInputPlanNode(node, "Cross("+node.getOperator().getName()+")", in1, in2, getStrategy());
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
index a17063e..368944e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
@@ -215,7 +215,7 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 			inputOrders = tmp;
 		}
 		
-		return new DualInputPlanNode(node, "CoGroup ("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders);
+		return new DualInputPlanNode(node, "CoGroup ("+node.getOperator().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
index ffae66f..8e7edeb 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
@@ -60,7 +60,7 @@ public class CoGroupWithSolutionSetFirstDescriptor extends CoGroupDescriptor {
 			inputOrders = tmp;
 		}
 
-		return new DualInputPlanNode(node, "CoGroup ("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders);
+		return new DualInputPlanNode(node, "CoGroup ("+node.getOperator().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
index 92f7474..bcd4d73 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
@@ -42,7 +42,7 @@ public class CollectorMapDescriptor extends OperatorDescriptorSingle {
 
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		return new SingleInputPlanNode(node, "Map ("+node.getPactContract().getName()+")", in, DriverStrategy.COLLECTOR_MAP);
+		return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.COLLECTOR_MAP);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
index e7ff210..81c823f 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
@@ -41,7 +41,7 @@ public class FilterDescriptor extends OperatorDescriptorSingle {
 
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		return new SingleInputPlanNode(node, "Filter ("+node.getPactContract().getName()+")", in, DriverStrategy.FLAT_MAP);
+		return new SingleInputPlanNode(node, "Filter ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
index 5384aa6..b915e45 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
@@ -42,7 +42,7 @@ public class FlatMapDescriptor extends OperatorDescriptorSingle {
 
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		return new SingleInputPlanNode(node, "FlatMap ("+node.getPactContract().getName()+")", in, DriverStrategy.FLAT_MAP);
+		return new SingleInputPlanNode(node, "FlatMap ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
index d0e839a..b648386 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
@@ -71,12 +71,12 @@ public final class GroupCombineProperties extends OperatorDescriptorSingle {
 
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		node.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+		node.setDegreeOfParallelism(in.getSource().getParallelism());
 		
 		// sorting key info
 		SingleInputPlanNode singleInputPlanNode = new SingleInputPlanNode(
 				node, 
-				"GroupCombine (" + node.getPactContract().getName() + ")",
+				"GroupCombine (" + node.getOperator().getName() + ")",
 				in, // reuse the combine strategy also used in the group reduce
 				DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
index c66321d..ebd09f2 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
@@ -85,7 +85,7 @@ public final class GroupReduceProperties extends OperatorDescriptorSingle {
 
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+		return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
index 1caee6c..c4f47d3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
@@ -98,7 +98,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 				in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
 									in.getLocalStrategySortOrder());
 			}
-			return new SingleInputPlanNode(node, "Reduce("+node.getPactContract().getName()+")", in,
+			return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", in,
 											DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
 		} else {
 			// non forward case. all local properties are killed anyways, so we can safely plug in a combiner
@@ -107,9 +107,9 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 
 			// create an input node for combine with same DOP as input node
 			GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
-			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+			combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
 
-			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract()
+			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
 					.getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE);
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
@@ -124,7 +124,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 			toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
 										in.getLocalStrategySortOrder());
 
-			return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")",
+			return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")",
 											toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
index a6c4500..fec72a9 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
@@ -79,7 +79,7 @@ public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor {
 		else {
 			strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST;
 		}
-		return new DualInputPlanNode(node, "Join("+node.getPactContract().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
+		return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
index 79cb3cb..f9d1e6c 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
@@ -78,7 +78,7 @@ public final class HashJoinBuildSecondProperties extends AbstractJoinDescriptor
 		else {
 			strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND;
 		}
-		return new DualInputPlanNode(node, "Join ("+node.getPactContract().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
+		return new DualInputPlanNode(node, "Join ("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
index e55a728..9f14d2a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
@@ -41,7 +41,7 @@ public class MapDescriptor extends OperatorDescriptorSingle {
 
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		return new SingleInputPlanNode(node, "Map ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP);
+		return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.MAP);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
index 8cef12d..1489097 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
@@ -41,7 +41,7 @@ public class MapPartitionDescriptor extends OperatorDescriptorSingle {
 
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		return new SingleInputPlanNode(node, "MapPartition ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP_PARTITION);
+		return new SingleInputPlanNode(node, "MapPartition ("+node.getOperator().getName()+")", in, DriverStrategy.MAP_PARTITION);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
index fe9302a..2bde29b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
@@ -48,10 +48,10 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle {
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
 		// create in input node for combine with same DOP as input node
-		GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getPactContract());
-		combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+		GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getOperator());
+		combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
 
-		SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in,
+		SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", in,
 				DriverStrategy.SORTED_GROUP_COMBINE);
 		// sorting key info
 		combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
index 01096f0..5bb51f3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
@@ -61,7 +61,7 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
 		if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
 				(node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty()))
 		{
-			return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")", in,
+			return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in,
 											DriverStrategy.SORTED_REDUCE, this.keyList);
 		}
 		else {
@@ -71,10 +71,10 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
 			
 			// create an input node for combine with same DOP as input node
 			ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
-			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+			combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
 
 			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
-								"Combine ("+node.getPactContract().getName()+")", toCombiner,
+								"Combine ("+node.getOperator().getName()+")", toCombiner,
 								DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList);
 
 			combiner.setCosts(new Costs(0, 0));
@@ -85,7 +85,7 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
 										in.getShipStrategySortOrder(), in.getDataExchangeMode());
 			toReducer.setLocalStrategy(LocalStrategy.SORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
 
-			return new SingleInputPlanNode(node, "Reduce("+node.getPactContract().getName()+")", toReducer,
+			return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", toReducer,
 											DriverStrategy.SORTED_REDUCE, this.keyList);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
index afe7e8d..356836a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
@@ -99,7 +99,7 @@ public class SortMergeJoinDescriptor extends AbstractJoinDescriptor {
 			inputOrders = tmp;
 		}
 		
-		return new DualInputPlanNode(node, "Join("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.MERGE, this.keys1, this.keys2, inputOrders);
+		return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, DriverStrategy.MERGE, this.keys1, this.keys2, inputOrders);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
index 093e968..bf22fb3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
@@ -42,7 +42,7 @@ public class BinaryUnionPlanNode extends DualInputPlanNode {
 		this.nodeCosts = toSwapFrom.nodeCosts;
 		this.cumulativeCosts = toSwapFrom.cumulativeCosts;
 		
-		setDegreeOfParallelism(toSwapFrom.getDegreeOfParallelism());
+		setParallelism(toSwapFrom.getParallelism());
 	}
 	
 	public BinaryUnionNode getOptimizerNode() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
index 8a2398c..875d1c3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
@@ -37,7 +37,30 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 
 /**
- * A Channel is a data exchange between two operators.
+ * A Channel represents the result produced by an operator and the data exchange
+ * before the consumption by the target operator.
+ *
+ * The channel defines and tracks various properties and characteristics of the
+ * data set and data exchange.
+ *
+ * Data set characteristics:
+ * <ul>
+ *     <li>The "global properties" of the data, i.e., how the data is distributed across
+ *         partitions</li>
+ *     <li>The "required global properties" of the data, i.e., the global properties that, if absent,
+ *         would cause the program to return a wrong result.</li>
+ *     <li>The "local properties" of the data, i.e., how the data is organized within a partition</li>
+ *     <li>The "required local properties" of the data, i.e., the local properties that, if absent,
+ *         would cause the program to return a wrong result.</li>
+ * </ul>
+ *
+ * Data exchange parameters:
+ * <ul>
+ *     <li>The "ship strategy", i.e., whether to forward the data, shuffle it, broadcast it, ...</li>
+ *     <li>The "ship keys", which are the positions of the key fields in the exchanged records.</li>
+ *     <li>The "data exchange mode", which defines whether to pipeline or batch the exchange</li>
+ *     <li>Several more...</li>
+ * </ul>
  */
 public class Channel implements EstimateProvider, Cloneable, DumpableConnection<PlanNode> {
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
index eea9b67..01c56dd 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
@@ -81,10 +81,10 @@ public class DualInputPlanNode extends PlanNode {
 		this.sortOrders = driverSortOrders;
 		
 		if (this.input1.getShipStrategy() == ShipStrategyType.BROADCAST) {
-			this.input1.setReplicationFactor(getDegreeOfParallelism());
+			this.input1.setReplicationFactor(getParallelism());
 		}
 		if (this.input2.getShipStrategy() == ShipStrategyType.BROADCAST) {
-			this.input2.setReplicationFactor(getDegreeOfParallelism());
+			this.input2.setReplicationFactor(getParallelism());
 		}
 		
 		mergeBranchPlanMaps(input1.getSource(), input2.getSource());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
index e565909..6f634fb 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.plan;
 
 import java.util.ArrayList;
@@ -73,7 +72,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 	
 	private double relativeMemoryPerSubTask;					// the amount of memory dedicated to each task, in bytes
 	
-	private int degreeOfParallelism;
+	private int parallelism;
 	
 	private boolean pFlag;							// flag for the internal pruning algorithm
 	
@@ -86,7 +85,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 		this.nodeName = nodeName;
 		this.driverStrategy = strategy;
 		
-		this.degreeOfParallelism = template.getDegreeOfParallelism();
+		this.parallelism = template.getParallelism();
 
 		// check, if there is branch at this node. if yes, this candidate must be associated with
 		// the branching template node.
@@ -134,21 +133,21 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 	// --------------------------------------------------------------------------------------------
 	
 	/**
-	 * Gets the optimizer's pact node for which this plan candidate node was created.
+	 * Gets the node from the optimizer DAG for which this plan candidate node was created.
 	 * 
-	 * @return The template optimizer's node.
+	 * @return The optimizer's DAG node.
 	 */
 	public OptimizerNode getOriginalOptimizerNode() {
 		return this.template;
 	}
 	
 	/**
-	 * Gets the pact contract this node represents in the plan.
+	 * Gets the program operator that this node represents in the plan.
 	 * 
-	 * @return The pact contract this node represents in the plan.
+	 * @return The program operator this node represents in the plan.
 	 */
-	public Operator<?> getPactContract() {
-		return this.template.getPactContract();
+	public Operator<?> getProgramOperator() {
+		return this.template.getOperator();
 	}
 	
 	/**
@@ -252,7 +251,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 			return null;
 		} else {
 			Costs result = cumulativeCosts.clone();
-			if (this.template != null && this.template.getOutgoingConnections() != null) {
+			if (this.template.getOutgoingConnections() != null) {
 				int outDegree = this.template.getOutgoingConnections().size();
 				if (outDegree > 0) {
 					result.divideBy(outDegree);
@@ -302,12 +301,12 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 		}
 	}
 	
-	public void setDegreeOfParallelism(int parallelism) {
-		this.degreeOfParallelism = parallelism;
+	public void setParallelism(int parallelism) {
+		this.parallelism = parallelism;
 	}
 	
-	public int getDegreeOfParallelism() {
-		return this.degreeOfParallelism;
+	public int getParallelism() {
+		return this.parallelism;
 	}
 	
 	public long getGuaranteedAvailableMemory() {
@@ -514,7 +513,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 
 	@Override
 	public String toString() {
-		return this.template.getName() + " \"" + getPactContract().getName() + "\" : " + this.driverStrategy +
+		return this.template.getName() + " \"" + getProgramOperator().getName() + "\" : " + this.driverStrategy +
 				" [[ " + this.globalProps + " ]] [[ " + this.localProps + " ]]";
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
index cefd704..b928be7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
@@ -81,7 +81,7 @@ public class SingleInputPlanNode extends PlanNode {
 		}
 		
 		if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) {
-			this.input.setReplicationFactor(getDegreeOfParallelism());
+			this.input.setReplicationFactor(getParallelism());
 		}
 		
 		final PlanNode predNode = input.getSource();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
index 4083a2a..6f918c0 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -39,7 +39,7 @@ import org.apache.flink.optimizer.dag.BulkIterationNode;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.dag.DataSourceNode;
 import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.PactConnection;
+import org.apache.flink.optimizer.dag.DagConnection;
 import org.apache.flink.optimizer.dag.TempMode;
 import org.apache.flink.optimizer.dag.WorksetIterationNode;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -231,18 +231,18 @@ public class PlanJSONDumpGenerator {
 		String contents;
 		if (n instanceof DataSinkNode) {
 			type = "sink";
-			contents = n.getPactContract().toString();
+			contents = n.getOperator().toString();
 		} else if (n instanceof DataSourceNode) {
 			type = "source";
-			contents = n.getPactContract().toString();
+			contents = n.getOperator().toString();
 		}
 		else if (n instanceof BulkIterationNode) {
 			type = "bulk_iteration";
-			contents = n.getPactContract().getName();
+			contents = n.getOperator().getName();
 		}
 		else if (n instanceof WorksetIterationNode) {
 			type = "workset_iteration";
-			contents = n.getPactContract().getName();
+			contents = n.getOperator().getName();
 		}
 		else if (n instanceof BinaryUnionNode) {
 			type = "pact";
@@ -250,7 +250,7 @@ public class PlanJSONDumpGenerator {
 		}
 		else {
 			type = "pact";
-			contents = n.getPactContract().getName();
+			contents = n.getOperator().getName();
 		}
 		
 		contents = StringUtils.showControlCharacters(contents);
@@ -277,7 +277,7 @@ public class PlanJSONDumpGenerator {
 
 		// degree of parallelism
 		writer.print(",\n\t\t\"parallelism\": \""
-			+ (n.getDegreeOfParallelism() >= 1 ? n.getDegreeOfParallelism() : "default") + "\"");
+			+ (n.getParallelism() >= 1 ? n.getParallelism() : "default") + "\"");
 		
 		// output node predecessors
 		Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs().iterator();
@@ -294,10 +294,10 @@ public class PlanJSONDumpGenerator {
 				writer.print(inputNum == 0 ? "\n" : ",\n");
 				if (inputNum == 0) {
 					child1name += child1name.length() > 0 ? ", " : ""; 
-					child1name += source.getOptimizerNode().getPactContract().getName();
+					child1name += source.getOptimizerNode().getOperator().getName();
 				} else if (inputNum == 1) {
 					child2name += child2name.length() > 0 ? ", " : ""; 
-					child2name = source.getOptimizerNode().getPactContract().getName();
+					child2name = source.getOptimizerNode().getOperator().getName();
 				}
 
 				// output predecessor id
@@ -310,7 +310,7 @@ public class PlanJSONDumpGenerator {
 				// output shipping strategy and channel type
 				final Channel channel = (inConn instanceof Channel) ? (Channel) inConn : null; 
 				final ShipStrategyType shipType = channel != null ? channel.getShipStrategy() :
-						((PactConnection) inConn).getShipStrategy();
+						((DagConnection) inConn).getShipStrategy();
 					
 				String shipStrategy = null;
 				if (shipType != null) {
@@ -588,8 +588,8 @@ public class PlanJSONDumpGenerator {
 		}
 
 		// output the node compiler hints
-		if (n.getPactContract().getCompilerHints() != null) {
-			CompilerHints hints = n.getPactContract().getCompilerHints();
+		if (n.getOperator().getCompilerHints() != null) {
+			CompilerHints hints = n.getOperator().getCompilerHints();
 			CompilerHints defaults = new CompilerHints();
 
 			String size = hints.getOutputSize() == defaults.getOutputSize() ? "(none)" : String.valueOf(hints.getOutputSize());