You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:29 UTC
[50/53] [abbrv] flink git commit: [FLINK-441] [optimizer] Rename
Pact* and Nephele* classes
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
index ced0e83..6946641 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.optimizer.dataproperties;
import java.util.HashSet;
@@ -29,13 +28,16 @@ import org.apache.flink.optimizer.dag.SingleInputNode;
import org.apache.flink.optimizer.dag.TwoInputNode;
/**
- * The interesting properties that a node in the optimizer plan hands to its predecessors. It has the
- * purpose to tell the preceding nodes, which data properties might have the advantage, because they would
- * let the node fulfill its pact cheaper. More on optimization with interesting properties can be found
- * in the works on the volcano- and cascades optimizer framework.
+ * Interesting properties are propagated from parent operators to child operators. They tell the child
+ * what data properties would help the parent in operating in a cheaper fashion. A reduce operator, for
+ * example, tells its child that partitioned data would help. If the child is a join operator, it can use
+ * that knowledge to favor strategies that leave the data in a partitioned form.
+ *
+ * More on optimization with interesting properties can be found in the works on
+ * the volcano- and cascades optimizer framework.
*/
-public class InterestingProperties implements Cloneable
-{
+public class InterestingProperties implements Cloneable {
+
private Set<RequestedGlobalProperties> globalProps; // the global properties, i.e. properties across partitions
private Set<RequestedLocalProperties> localProps; // the local properties, i.e. properties within partitions
@@ -91,8 +93,7 @@ public class InterestingProperties implements Cloneable
return this.globalProps;
}
- public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input)
- {
+ public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) {
InterestingProperties iProps = new InterestingProperties();
SemanticProperties props;
if (node instanceof SingleInputNode || node instanceof TwoInputNode) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
index 0c3ea12..e0231aa 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
/**
* This class represents local properties of the data. A local property is a property that exists
- * within the data of a single partition.
+ * within the data of a single partition, such as sort order, or data grouping.
*/
public class LocalProperties implements Cloneable {
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
index 674cdb8..5e06dd3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
@@ -19,42 +19,45 @@
package org.apache.flink.optimizer.dataproperties;
/**
- * An enumeration tracking the different types of sharding strategies.
+ * An enumeration of the the different types of distributing data across partitions or
+ * parallel workers.
*/
public enum PartitioningProperty {
/**
- * Any data distribution, i.e., random partitioning or full replication.
+ * Any possible way of data distribution, including random partitioning and full replication.
*/
ANY_DISTRIBUTION,
/**
- * Constant indicating no particular partitioning (i.e. random) data distribution.
+ * A random disjunct (non-replicated) data distribution, where each datum is contained in one partition only.
+ * This is for example the result of parallel scans of data in a file system like HDFS,
+ * or the result of a round-robin data distribution.
*/
RANDOM_PARTITIONED,
/**
- * Constant indicating a hash partitioning.
+ * A hash partitioning on a certain key.
*/
HASH_PARTITIONED,
/**
- * Constant indicating a range partitioning.
+ * A range partitioning on a certain key.
*/
RANGE_PARTITIONED,
/**
- * Constant indicating any not further specified disjunct partitioning.
+ * A not further specified partitioning on a key (hash-, or range partitioning, or some other scheme even).
*/
ANY_PARTITIONING,
/**
- * Constant indicating full replication of the data to each parallel instance.
+ *Full replication of the data to each parallel instance.
*/
FULL_REPLICATION,
/**
- * Constant indicating a forced even re-balancing.
+ * A forced even re-balancing. All partitions are guaranteed to have almost the same number of records.
*/
FORCED_REBALANCED,
@@ -95,10 +98,10 @@ public enum PartitioningProperty {
/**
* Checks, if this property represents a partitioning that is computable.
- * Computable partitionings can be recreated through an algorithm. If two sets of data are to
+ * A computable partitioning can be recreated through an algorithm. If two sets of data are to
* be co-partitioned, it is crucial, that the partitioning schemes are computable.
* <p>
- * Examples for computable partitioning schemes are hash- or range-partitionings. An example for a non-computable
+ * Examples for computable partitioning schemes are hash- or range-partitioning. An example for a non-computable
* partitioning is the implicit partitioning that exists though a globally unique key.
*
* @return True, if this enum constant is a re-computable partitioning.
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java
index 102a0f0..a9d14e8 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java
@@ -29,14 +29,16 @@ import org.apache.flink.optimizer.util.Utils;
import org.apache.flink.runtime.operators.util.LocalStrategy;
/**
- * This class represents local properties of the data. A local property is a property that exists
- * within the data of a single partition.
+ * This class represents the local properties of the data that are requested by an operator.
+ * Local properties are the properties within one partition.
+ * Operators request the local properties they need for correct execution. Here are some example local
+ * properties requested by certain operators:
+ * <ul>
+ * <li>"groupBy/reduce" will request the data to be grouped on the key fields.</li>
+ * <li>A sort-merge join will request the data from each input to be sorted on the respective join key.</li>
+ * </ul>
*/
public class RequestedLocalProperties implements Cloneable {
-
- public static final RequestedLocalProperties DEFAULT_PROPERTIES = null;
-
- // --------------------------------------------------------------------------------------------
private Ordering ordering; // order inside a partition, null if not ordered
@@ -205,9 +207,9 @@ public class RequestedLocalProperties implements Cloneable {
}
/**
- * Parameterizes the local strategy fields of a channel such that the channel produces the desired local properties.
+ * Parametrizes the local strategy fields of a channel such that the channel produces the desired local properties.
*
- * @param channel The channel to parameterize.
+ * @param channel The channel to parametrize.
*/
public void parameterizeChannel(Channel channel) {
LocalProperties current = channel.getLocalProperties();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java
index eaf11ac..9ee56b0 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java
@@ -40,7 +40,7 @@ public final class AllGroupCombineProperties extends OperatorDescriptorSingle {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "GroupCombine ("+node.getPactContract().getName()+")", in, DriverStrategy.ALL_GROUP_COMBINE);
+ return new SingleInputPlanNode(node, "GroupCombine ("+node.getOperator().getName()+")", in, DriverStrategy.ALL_GROUP_COMBINE);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java
index 55d6fbb..9efd8c7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java
@@ -40,7 +40,7 @@ public final class AllGroupReduceProperties extends OperatorDescriptorSingle {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")", in, DriverStrategy.ALL_GROUP_REDUCE);
+ return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")", in, DriverStrategy.ALL_GROUP_REDUCE);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
index 7d07e7d..b3c083a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
@@ -46,7 +46,7 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
// locally connected, directly instantiate
- return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")",
+ return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")",
in, DriverStrategy.ALL_GROUP_REDUCE);
} else {
// non forward case.plug in a combiner
@@ -55,10 +55,10 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
// create an input node for combine with same DOP as input node
GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
- combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+ combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
- "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_REDUCE_COMBINE);
+ "Combine ("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_REDUCE_COMBINE);
combiner.setCosts(new Costs(0, 0));
combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
@@ -67,7 +67,7 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
in.getShipStrategySortOrder(), in.getDataExchangeMode());
toReducer.setLocalStrategy(in.getLocalStrategy(), in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
- return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")",
+ return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")",
toReducer, DriverStrategy.ALL_GROUP_REDUCE);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
index 4f6a4fd..a172a60 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
@@ -45,7 +45,7 @@ public final class AllReduceProperties extends OperatorDescriptorSingle {
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
// locally connected, directly instantiate
- return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")",
+ return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")",
in, DriverStrategy.ALL_REDUCE);
} else {
// non forward case.plug in a combiner
@@ -54,10 +54,10 @@ public final class AllReduceProperties extends OperatorDescriptorSingle {
// create an input node for combine with same DOP as input node
ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
- combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+ combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
- "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE);
+ "Combine ("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE);
combiner.setCosts(new Costs(0, 0));
combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
@@ -67,7 +67,7 @@ public final class AllReduceProperties extends OperatorDescriptorSingle {
toReducer.setLocalStrategy(in.getLocalStrategy(), in.getLocalStrategyKeys(),
in.getLocalStrategySortOrder());
- return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")",
+ return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")",
toReducer, DriverStrategy.ALL_REDUCE);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
index c46ce56..f48e297 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
@@ -93,7 +93,7 @@ public abstract class CartesianProductDescriptor extends OperatorDescriptorDual
@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
- return new DualInputPlanNode(node, "Cross("+node.getPactContract().getName()+")", in1, in2, getStrategy());
+ return new DualInputPlanNode(node, "Cross("+node.getOperator().getName()+")", in1, in2, getStrategy());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
index a17063e..368944e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
@@ -215,7 +215,7 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
inputOrders = tmp;
}
- return new DualInputPlanNode(node, "CoGroup ("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders);
+ return new DualInputPlanNode(node, "CoGroup ("+node.getOperator().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
index ffae66f..8e7edeb 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
@@ -60,7 +60,7 @@ public class CoGroupWithSolutionSetFirstDescriptor extends CoGroupDescriptor {
inputOrders = tmp;
}
- return new DualInputPlanNode(node, "CoGroup ("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders);
+ return new DualInputPlanNode(node, "CoGroup ("+node.getOperator().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
index 92f7474..bcd4d73 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
@@ -42,7 +42,7 @@ public class CollectorMapDescriptor extends OperatorDescriptorSingle {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "Map ("+node.getPactContract().getName()+")", in, DriverStrategy.COLLECTOR_MAP);
+ return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.COLLECTOR_MAP);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
index e7ff210..81c823f 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
@@ -41,7 +41,7 @@ public class FilterDescriptor extends OperatorDescriptorSingle {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "Filter ("+node.getPactContract().getName()+")", in, DriverStrategy.FLAT_MAP);
+ return new SingleInputPlanNode(node, "Filter ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
index 5384aa6..b915e45 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
@@ -42,7 +42,7 @@ public class FlatMapDescriptor extends OperatorDescriptorSingle {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "FlatMap ("+node.getPactContract().getName()+")", in, DriverStrategy.FLAT_MAP);
+ return new SingleInputPlanNode(node, "FlatMap ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
index d0e839a..b648386 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
@@ -71,12 +71,12 @@ public final class GroupCombineProperties extends OperatorDescriptorSingle {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- node.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+ node.setDegreeOfParallelism(in.getSource().getParallelism());
// sorting key info
SingleInputPlanNode singleInputPlanNode = new SingleInputPlanNode(
node,
- "GroupCombine (" + node.getPactContract().getName() + ")",
+ "GroupCombine (" + node.getOperator().getName() + ")",
in, // reuse the combine strategy also used in the group reduce
DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
index c66321d..ebd09f2 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
@@ -85,7 +85,7 @@ public final class GroupReduceProperties extends OperatorDescriptorSingle {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+ return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
index 1caee6c..c4f47d3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
@@ -98,7 +98,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
in.getLocalStrategySortOrder());
}
- return new SingleInputPlanNode(node, "Reduce("+node.getPactContract().getName()+")", in,
+ return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", in,
DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
} else {
// non forward case. all local properties are killed anyways, so we can safely plug in a combiner
@@ -107,9 +107,9 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
// create an input node for combine with same DOP as input node
GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
- combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+ combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
- SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract()
+ SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
.getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE);
combiner.setCosts(new Costs(0, 0));
combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
@@ -124,7 +124,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
in.getLocalStrategySortOrder());
- return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")",
+ return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")",
toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
index a6c4500..fec72a9 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
@@ -79,7 +79,7 @@ public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor {
else {
strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST;
}
- return new DualInputPlanNode(node, "Join("+node.getPactContract().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
+ return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
index 79cb3cb..f9d1e6c 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
@@ -78,7 +78,7 @@ public final class HashJoinBuildSecondProperties extends AbstractJoinDescriptor
else {
strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND;
}
- return new DualInputPlanNode(node, "Join ("+node.getPactContract().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
+ return new DualInputPlanNode(node, "Join ("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
index e55a728..9f14d2a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
@@ -41,7 +41,7 @@ public class MapDescriptor extends OperatorDescriptorSingle {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "Map ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP);
+ return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.MAP);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
index 8cef12d..1489097 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
@@ -41,7 +41,7 @@ public class MapPartitionDescriptor extends OperatorDescriptorSingle {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "MapPartition ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP_PARTITION);
+ return new SingleInputPlanNode(node, "MapPartition ("+node.getOperator().getName()+")", in, DriverStrategy.MAP_PARTITION);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
index fe9302a..2bde29b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
@@ -48,10 +48,10 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
// create in input node for combine with same DOP as input node
- GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getPactContract());
- combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+ GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getOperator());
+ combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
- SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in,
+ SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", in,
DriverStrategy.SORTED_GROUP_COMBINE);
// sorting key info
combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0);
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
index 01096f0..5bb51f3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
@@ -61,7 +61,7 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
(node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty()))
{
- return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")", in,
+ return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in,
DriverStrategy.SORTED_REDUCE, this.keyList);
}
else {
@@ -71,10 +71,10 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
// create an input node for combine with same DOP as input node
ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
- combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+ combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
- "Combine ("+node.getPactContract().getName()+")", toCombiner,
+ "Combine ("+node.getOperator().getName()+")", toCombiner,
DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList);
combiner.setCosts(new Costs(0, 0));
@@ -85,7 +85,7 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
in.getShipStrategySortOrder(), in.getDataExchangeMode());
toReducer.setLocalStrategy(LocalStrategy.SORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
- return new SingleInputPlanNode(node, "Reduce("+node.getPactContract().getName()+")", toReducer,
+ return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", toReducer,
DriverStrategy.SORTED_REDUCE, this.keyList);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
index afe7e8d..356836a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
@@ -99,7 +99,7 @@ public class SortMergeJoinDescriptor extends AbstractJoinDescriptor {
inputOrders = tmp;
}
- return new DualInputPlanNode(node, "Join("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.MERGE, this.keys1, this.keys2, inputOrders);
+ return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, DriverStrategy.MERGE, this.keys1, this.keys2, inputOrders);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
index 093e968..bf22fb3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
@@ -42,7 +42,7 @@ public class BinaryUnionPlanNode extends DualInputPlanNode {
this.nodeCosts = toSwapFrom.nodeCosts;
this.cumulativeCosts = toSwapFrom.cumulativeCosts;
- setDegreeOfParallelism(toSwapFrom.getDegreeOfParallelism());
+ setParallelism(toSwapFrom.getParallelism());
}
public BinaryUnionNode getOptimizerNode() {
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
index 8a2398c..875d1c3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
@@ -37,7 +37,30 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
/**
- * A Channel is a data exchange between two operators.
+ * A Channel represents the result produced by an operator and the data exchange
+ * before the consumption by the target operator.
+ *
+ * The channel defines and tracks various properties and characteristics of the
+ * data set and data exchange.
+ *
+ * Data set characteristics:
+ * <ul>
+ * <li>The "global properties" of the data, i.e., how the data is distributed across
+ * partitions</li>
+ * <li>The "required global properties" of the data, i.e., the global properties that, if absent,
+ * would cause the program to return a wrong result.</li>
+ * <li>The "local properties" of the data, i.e., how the data is organized within a partition</li>
+ * <li>The "required local properties" of the data, i.e., the local properties that, if absent,
+ * would cause the program to return a wrong result.</li>
+ * </ul>
+ *
+ * Data exchange parameters:
+ * <ul>
+ * <li>The "ship strategy", i.e., whether to forward the data, shuffle it, broadcast it, ...</li>
+ * <li>The "ship keys", which are the positions of the key fields in the exchanged records.</li>
+ * <li>The "data exchange mode", which defines whether to pipeline or batch the exchange</li>
+ * <li>Several more...</li>
+ * </ul>
*/
public class Channel implements EstimateProvider, Cloneable, DumpableConnection<PlanNode> {
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
index eea9b67..01c56dd 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
@@ -81,10 +81,10 @@ public class DualInputPlanNode extends PlanNode {
this.sortOrders = driverSortOrders;
if (this.input1.getShipStrategy() == ShipStrategyType.BROADCAST) {
- this.input1.setReplicationFactor(getDegreeOfParallelism());
+ this.input1.setReplicationFactor(getParallelism());
}
if (this.input2.getShipStrategy() == ShipStrategyType.BROADCAST) {
- this.input2.setReplicationFactor(getDegreeOfParallelism());
+ this.input2.setReplicationFactor(getParallelism());
}
mergeBranchPlanMaps(input1.getSource(), input2.getSource());
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
index e565909..6f634fb 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.optimizer.plan;
import java.util.ArrayList;
@@ -73,7 +72,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
private double relativeMemoryPerSubTask; // the amount of memory dedicated to each task, in bytes
- private int degreeOfParallelism;
+ private int parallelism;
private boolean pFlag; // flag for the internal pruning algorithm
@@ -86,7 +85,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
this.nodeName = nodeName;
this.driverStrategy = strategy;
- this.degreeOfParallelism = template.getDegreeOfParallelism();
+ this.parallelism = template.getParallelism();
// check, if there is branch at this node. if yes, this candidate must be associated with
// the branching template node.
@@ -134,21 +133,21 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
// --------------------------------------------------------------------------------------------
/**
- * Gets the optimizer's pact node for which this plan candidate node was created.
+ * Gets the node from the optimizer DAG for which this plan candidate node was created.
*
- * @return The template optimizer's node.
+ * @return The optimizer's DAG node.
*/
public OptimizerNode getOriginalOptimizerNode() {
return this.template;
}
/**
- * Gets the pact contract this node represents in the plan.
+ * Gets the program operator that this node represents in the plan.
*
- * @return The pact contract this node represents in the plan.
+ * @return The program operator this node represents in the plan.
*/
- public Operator<?> getPactContract() {
- return this.template.getPactContract();
+ public Operator<?> getProgramOperator() {
+ return this.template.getOperator();
}
/**
@@ -252,7 +251,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
return null;
} else {
Costs result = cumulativeCosts.clone();
- if (this.template != null && this.template.getOutgoingConnections() != null) {
+ if (this.template.getOutgoingConnections() != null) {
int outDegree = this.template.getOutgoingConnections().size();
if (outDegree > 0) {
result.divideBy(outDegree);
@@ -302,12 +301,12 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
}
}
- public void setDegreeOfParallelism(int parallelism) {
- this.degreeOfParallelism = parallelism;
+ public void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
}
- public int getDegreeOfParallelism() {
- return this.degreeOfParallelism;
+ public int getParallelism() {
+ return this.parallelism;
}
public long getGuaranteedAvailableMemory() {
@@ -514,7 +513,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
@Override
public String toString() {
- return this.template.getName() + " \"" + getPactContract().getName() + "\" : " + this.driverStrategy +
+ return this.template.getName() + " \"" + getProgramOperator().getName() + "\" : " + this.driverStrategy +
" [[ " + this.globalProps + " ]] [[ " + this.localProps + " ]]";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
index cefd704..b928be7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
@@ -81,7 +81,7 @@ public class SingleInputPlanNode extends PlanNode {
}
if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) {
- this.input.setReplicationFactor(getDegreeOfParallelism());
+ this.input.setReplicationFactor(getParallelism());
}
final PlanNode predNode = input.getSource();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
index 4083a2a..6f918c0 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -39,7 +39,7 @@ import org.apache.flink.optimizer.dag.BulkIterationNode;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.dag.DataSourceNode;
import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.PactConnection;
+import org.apache.flink.optimizer.dag.DagConnection;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.dag.WorksetIterationNode;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -231,18 +231,18 @@ public class PlanJSONDumpGenerator {
String contents;
if (n instanceof DataSinkNode) {
type = "sink";
- contents = n.getPactContract().toString();
+ contents = n.getOperator().toString();
} else if (n instanceof DataSourceNode) {
type = "source";
- contents = n.getPactContract().toString();
+ contents = n.getOperator().toString();
}
else if (n instanceof BulkIterationNode) {
type = "bulk_iteration";
- contents = n.getPactContract().getName();
+ contents = n.getOperator().getName();
}
else if (n instanceof WorksetIterationNode) {
type = "workset_iteration";
- contents = n.getPactContract().getName();
+ contents = n.getOperator().getName();
}
else if (n instanceof BinaryUnionNode) {
type = "pact";
@@ -250,7 +250,7 @@ public class PlanJSONDumpGenerator {
}
else {
type = "pact";
- contents = n.getPactContract().getName();
+ contents = n.getOperator().getName();
}
contents = StringUtils.showControlCharacters(contents);
@@ -277,7 +277,7 @@ public class PlanJSONDumpGenerator {
// degree of parallelism
writer.print(",\n\t\t\"parallelism\": \""
- + (n.getDegreeOfParallelism() >= 1 ? n.getDegreeOfParallelism() : "default") + "\"");
+ + (n.getParallelism() >= 1 ? n.getParallelism() : "default") + "\"");
// output node predecessors
Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs().iterator();
@@ -294,10 +294,10 @@ public class PlanJSONDumpGenerator {
writer.print(inputNum == 0 ? "\n" : ",\n");
if (inputNum == 0) {
child1name += child1name.length() > 0 ? ", " : "";
- child1name += source.getOptimizerNode().getPactContract().getName();
+ child1name += source.getOptimizerNode().getOperator().getName();
} else if (inputNum == 1) {
child2name += child2name.length() > 0 ? ", " : "";
- child2name = source.getOptimizerNode().getPactContract().getName();
+ child2name = source.getOptimizerNode().getOperator().getName();
}
// output predecessor id
@@ -310,7 +310,7 @@ public class PlanJSONDumpGenerator {
// output shipping strategy and channel type
final Channel channel = (inConn instanceof Channel) ? (Channel) inConn : null;
final ShipStrategyType shipType = channel != null ? channel.getShipStrategy() :
- ((PactConnection) inConn).getShipStrategy();
+ ((DagConnection) inConn).getShipStrategy();
String shipStrategy = null;
if (shipType != null) {
@@ -588,8 +588,8 @@ public class PlanJSONDumpGenerator {
}
// output the node compiler hints
- if (n.getPactContract().getCompilerHints() != null) {
- CompilerHints hints = n.getPactContract().getCompilerHints();
+ if (n.getOperator().getCompilerHints() != null) {
+ CompilerHints hints = n.getOperator().getCompilerHints();
CompilerHints defaults = new CompilerHints();
String size = hints.getOutputSize() == defaults.getOutputSize() ? "(none)" : String.valueOf(hints.getOutputSize());