You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/17 20:19:36 UTC
[04/51] [abbrv] flink git commit: [FLINK-2415] [optimizer] Create and
attach proper JobGraph describing JSON plans to JobGraph for batch jobs.
[FLINK-2415] [optimizer] Create and attach proper JobGraph describing JSON plans to JobGraph for batch jobs.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff28981f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff28981f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff28981f
Branch: refs/heads/master
Commit: ff28981f97cc434fb00ff7c931004888a84ad6c6
Parents: b6f52df
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 18 11:04:48 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 17 14:21:49 2015 +0200
----------------------------------------------------------------------
.../operators/base/BulkIterationBase.java | 2 +-
.../operators/base/DeltaIterationBase.java | 6 +-
.../flink/optimizer/dag/BinaryUnionNode.java | 2 +-
.../flink/optimizer/dag/BulkIterationNode.java | 4 +-
.../optimizer/dag/BulkPartialSolutionNode.java | 2 +-
.../apache/flink/optimizer/dag/CoGroupNode.java | 2 +-
.../flink/optimizer/dag/CoGroupRawNode.java | 2 +-
.../flink/optimizer/dag/CollectorMapNode.java | 2 +-
.../apache/flink/optimizer/dag/CrossNode.java | 2 +-
.../flink/optimizer/dag/DagConnection.java | 4 +-
.../flink/optimizer/dag/DataSinkNode.java | 2 +-
.../flink/optimizer/dag/DataSourceNode.java | 2 +-
.../apache/flink/optimizer/dag/FilterNode.java | 2 +-
.../apache/flink/optimizer/dag/FlatMapNode.java | 2 +-
.../flink/optimizer/dag/GroupCombineNode.java | 2 +-
.../flink/optimizer/dag/GroupReduceNode.java | 17 +-
.../apache/flink/optimizer/dag/JoinNode.java | 2 +-
.../org/apache/flink/optimizer/dag/MapNode.java | 2 +-
.../flink/optimizer/dag/MapPartitionNode.java | 2 +-
.../apache/flink/optimizer/dag/MatchNode.java | 2 +-
.../flink/optimizer/dag/OptimizerNode.java | 4 +-
.../flink/optimizer/dag/PartitionNode.java | 2 +-
.../apache/flink/optimizer/dag/ReduceNode.java | 2 +-
.../apache/flink/optimizer/dag/SinkJoiner.java | 2 +-
.../flink/optimizer/dag/SolutionSetNode.java | 2 +-
.../flink/optimizer/dag/SortPartitionNode.java | 2 +-
.../flink/optimizer/dag/UnaryOperatorNode.java | 2 +-
.../optimizer/dag/WorksetIterationNode.java | 6 +-
.../apache/flink/optimizer/dag/WorksetNode.java | 2 +-
.../apache/flink/optimizer/plan/PlanNode.java | 2 +-
.../flink/optimizer/plan/SinkPlanNode.java | 5 +-
.../flink/optimizer/plan/SourcePlanNode.java | 1 -
.../plandump/PlanJSONDumpGenerator.java | 16 +-
.../plantranslate/JobGraphGenerator.java | 227 +++++++++---
.../optimizer/plantranslate/JsonMapper.java | 299 ++++++++++++++++
.../app/partials/jobs/job.plan.node.jade | 23 +-
.../app/scripts/modules/jobs/jobs.dir.coffee | 21 +-
flink-runtime-web/web-dashboard/web/js/index.js | 74 ++--
.../web-dashboard/web/js/vendor.js | 352 +++++++------------
.../web/partials/jobs/job.plan.node.html | 22 +-
.../apache/flink/runtime/jobgraph/JobEdge.java | 74 +++-
.../apache/flink/runtime/jobgraph/JobGraph.java | 11 -
.../flink/runtime/jobgraph/JobVertex.java | 64 +++-
.../jobgraph/jsonplan/JsonPlanGenerator.java | 142 ++++++++
.../flink/runtime/operators/DriverStrategy.java | 8 +-
.../flink/runtime/jobmanager/JobManager.scala | 11 +-
.../jobgraph/jsonplan/JsonGeneratorTest.java | 119 +++++++
.../jsonplan/JsonJobGraphGenerationTest.java | 352 +++++++++++++++++++
48 files changed, 1489 insertions(+), 421 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
index 6304197..4fd4de6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
@@ -50,7 +50,7 @@ import org.apache.flink.util.Visitor;
*/
public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractRichFunction> implements IterationOperator {
- private static String DEFAULT_NAME = "<Unnamed Bulk Iteration>";
+ private static final String DEFAULT_NAME = "<Unnamed Bulk Iteration>";
public static final String TERMINATION_CRITERION_AGGREGATOR_NAME = "terminationCriterion.aggregator";
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
index 2986534..9a7674c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
@@ -84,7 +84,7 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab
}
public DeltaIterationBase(BinaryOperatorInformation<ST, WT, ST> operatorInfo, int[] keyPositions) {
- this(operatorInfo, keyPositions, "<Unnamed Workset-Iteration>");
+ this(operatorInfo, keyPositions, "<Unnamed Delta Iteration>");
}
public DeltaIterationBase(BinaryOperatorInformation<ST, WT, ST> operatorInfo, int keyPosition, String name) {
@@ -283,7 +283,7 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab
private final DeltaIterationBase<?, WT> containingIteration;
public WorksetPlaceHolder(DeltaIterationBase<?, WT> container, OperatorInformation<WT> operatorInfo) {
- super(operatorInfo, "Workset Place Holder");
+ super(operatorInfo, "Workset");
this.containingIteration = container;
}
@@ -312,7 +312,7 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab
protected final DeltaIterationBase<ST, ?> containingIteration;
public SolutionSetPlaceHolder(DeltaIterationBase<ST, ?> container, OperatorInformation<ST> operatorInfo) {
- super(operatorInfo, "Solution Set Place Holder");
+ super(operatorInfo, "Solution Set");
this.containingIteration = container;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
index 1600a50..fdd76a8 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -54,7 +54,7 @@ public class BinaryUnionNode extends TwoInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Union";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
index 5dd868e..3d95c22 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
@@ -184,7 +184,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
// --------------------------------------------------------------------------------------------
@Override
- public String getName() {
+ public String getOperatorName() {
return "Bulk Iteration";
}
@@ -352,7 +352,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
if (terminationCriterion == null) {
for (PlanNode candidate : candidates) {
- BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate);
+ BulkIterationPlanNode node = new BulkIterationPlanNode(this, this.getOperator().getName(), in, pspn, candidate);
GlobalProperties gProps = candidate.getGlobalProperties().clone();
LocalProperties lProps = candidate.getLocalProperties().clone();
node.initProperties(gProps, lProps);
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
index 25a7eef..0a02dd1 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
@@ -84,7 +84,7 @@ public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Bulk Partial Solution";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
index 20bad0d..58bce4b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
@@ -59,7 +59,7 @@ public class CoGroupNode extends TwoInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "CoGroup";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java
index 971d244..dcc3102 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java
@@ -48,7 +48,7 @@ public class CoGroupRawNode extends TwoInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "CoGroup";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
index 93be1e4..9c1bcd3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
@@ -42,7 +42,7 @@ public class CollectorMapNode extends SingleInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Map";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
index 8de67e8..622bfac 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
@@ -104,7 +104,7 @@ public class CrossNode extends TwoInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Cross";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
index 4e65976..1f98a11 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
@@ -267,7 +267,7 @@ public class DagConnection implements EstimateProvider, DumpableConnection<Optim
buf.append("null");
} else {
buf.append(this.source.getOperator().getName());
- buf.append('(').append(this.source.getName()).append(')');
+ buf.append('(').append(this.source.getOperatorName()).append(')');
}
buf.append(" -> ");
@@ -282,7 +282,7 @@ public class DagConnection implements EstimateProvider, DumpableConnection<Optim
buf.append("null");
} else {
buf.append(this.target.getOperator().getName());
- buf.append('(').append(this.target.getName()).append(')');
+ buf.append('(').append(this.target.getOperatorName()).append(')');
}
return buf.toString();
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
index 6ca1149..b35613f 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
@@ -92,7 +92,7 @@ public class DataSinkNode extends OptimizerNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Data Sink";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
index 6010f6a..867698d 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
@@ -110,7 +110,7 @@ public class DataSourceNode extends OptimizerNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Data Source";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
index 118ddc8..cb4e7bd 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
@@ -46,7 +46,7 @@ public class FilterNode extends SingleInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Filter";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
index f713d56..7dd84b2 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
@@ -45,7 +45,7 @@ public class FlatMapNode extends SingleInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "FlatMap";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
index 766d6af..7a1dd34 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
@@ -83,7 +83,7 @@ public class GroupCombineNode extends SingleInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "GroupCombine";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
index 51da36b..bd118ec 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
@@ -24,7 +24,6 @@ import java.util.List;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.optimizer.CompilerException;
@@ -45,6 +44,8 @@ public class GroupReduceNode extends SingleInputNode {
private final List<OperatorDescriptorSingle> possibleProperties;
+ private final String operatorName;
+
private GroupReduceNode combinerUtilityNode;
/**
@@ -54,6 +55,7 @@ public class GroupReduceNode extends SingleInputNode {
*/
public GroupReduceNode(GroupReduceOperatorBase<?, ?, ?> operator) {
super(operator);
+ this.operatorName = "GroupReduce";
if (this.keys == null) {
// case of a key-less reducer. force a parallelism of 1
@@ -63,8 +65,9 @@ public class GroupReduceNode extends SingleInputNode {
this.possibleProperties = initPossibleProperties(operator.getCustomPartitioner());
}
- public GroupReduceNode(GroupReduceNode reducerToCopyForCombiner) {
+ private GroupReduceNode(GroupReduceNode reducerToCopyForCombiner) {
super(reducerToCopyForCombiner);
+ this.operatorName = "GroupCombine";
this.possibleProperties = Collections.emptyList();
}
@@ -95,7 +98,7 @@ public class GroupReduceNode extends SingleInputNode {
// check if we can work with a grouping (simple reducer), or if we need ordering because of a group order
Ordering groupOrder = null;
if (getOperator() instanceof GroupReduceOperatorBase) {
- groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) getOperator()).getGroupOrder();
+ groupOrder = getOperator().getGroupOrder();
if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
groupOrder = null;
}
@@ -131,8 +134,8 @@ public class GroupReduceNode extends SingleInputNode {
}
@Override
- public String getName() {
- return "GroupReduce";
+ public String getOperatorName() {
+ return this.operatorName;
}
@Override
@@ -142,10 +145,8 @@ public class GroupReduceNode extends SingleInputNode {
@Override
protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
-
// Local properties for GroupReduce may only be preserved on key fields.
- SingleInputSemanticProperties origProps =
- ((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
+ SingleInputSemanticProperties origProps = getOperator().getSemanticProperties();
SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
FieldSet readSet = origProps.getReadFields(0);
if(readSet != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
index cbd58ca..02c9b5b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
@@ -67,7 +67,7 @@ public class JoinNode extends TwoInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Join";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
index 35def59..afbf4a8 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
@@ -45,7 +45,7 @@ public class MapNode extends SingleInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Map";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
index 6914c15..575dd63 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
@@ -49,7 +49,7 @@ public class MapPartitionNode extends SingleInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "MapPartition";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
index de3cd22..ee8ab05 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
@@ -63,7 +63,7 @@ public class MatchNode extends TwoInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Join";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
index 9688bb8..490c304 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
@@ -152,7 +152,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
*
* @return The node name.
*/
- public abstract String getName();
+ public abstract String getOperatorName();
/**
* This function connects the predecessors to this operator.
@@ -1119,7 +1119,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
public String toString() {
StringBuilder bld = new StringBuilder();
- bld.append(getName());
+ bld.append(getOperatorName());
bld.append(" (").append(getOperator().getName()).append(") ");
int i = 1;
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
index 5c811b0..33383cb 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
@@ -59,7 +59,7 @@ public class PartitionNode extends SingleInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Partition";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
index 52bfb6a..ed010bb 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
@@ -67,7 +67,7 @@ public class ReduceNode extends SingleInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Reduce";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
index 06606f0..979d254 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
@@ -51,7 +51,7 @@ public class SinkJoiner extends TwoInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Internal Utility Node";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
index 1292cf5..9b53999 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
@@ -79,7 +79,7 @@ public class SolutionSetNode extends AbstractPartialSolutionNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Solution Set";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
index 83bc39a..8037533 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
@@ -56,7 +56,7 @@ public class SortPartitionNode extends SingleInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Sort-Partition";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
index 45ecdac..0c48033 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
@@ -53,7 +53,7 @@ public class UnaryOperatorNode extends SingleInputNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return this.name;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
index 99c868c..15b9a50 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
@@ -222,7 +222,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
// --------------------------------------------------------------------------------------------
@Override
- public String getName() {
+ public String getOperatorName() {
return "Workset Iteration";
}
@@ -454,7 +454,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
}
WorksetIterationPlanNode wsNode = new WorksetIterationPlanNode(this,
- "WorksetIteration ("+this.getOperator().getName()+")", solutionSetIn,
+ this.getOperator().getName(), solutionSetIn,
worksetIn, sspn, wspn, worksetCandidate, solutionSetCandidate);
wsNode.setImmediateSolutionSetUpdate(immediateDeltaUpdate);
wsNode.initProperties(gp, lp);
@@ -572,7 +572,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Internal Utility Node";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
index 3b05aba..ae636c5 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
@@ -84,7 +84,7 @@ public class WorksetNode extends AbstractPartialSolutionNode {
}
@Override
- public String getName() {
+ public String getOperatorName() {
return "Workset";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
index 6f634fb..9505a57 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -513,7 +513,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
@Override
public String toString() {
- return this.template.getName() + " \"" + getProgramOperator().getName() + "\" : " + this.driverStrategy +
+ return this.template.getOperatorName() + " \"" + getProgramOperator().getName() + "\" : " + this.driverStrategy +
" [[ " + this.globalProps + " ]] [[ " + this.localProps + " ]]";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
index 656e67f..c28a5c3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.optimizer.plan;
import org.apache.flink.optimizer.dag.DataSinkNode;
@@ -25,8 +24,8 @@ import org.apache.flink.runtime.operators.DriverStrategy;
/**
* Plan candidate node for data flow sinks.
*/
-public class SinkPlanNode extends SingleInputPlanNode
-{
+public class SinkPlanNode extends SingleInputPlanNode {
+
/**
* Constructs a new sink candidate node that uses <i>NONE</i> as its local strategy. Note that
* local sorting and range partitioning are handled by the incoming channel already.
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
index 11b7cc9..937fe71 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.optimizer.plan;
import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
index dc99fd7..e248b0b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -39,7 +39,6 @@ import org.apache.flink.optimizer.dag.BulkIterationNode;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.dag.DataSourceNode;
import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.DagConnection;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.dag.WorksetIterationNode;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -56,9 +55,7 @@ import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.StringUtils;
-/**
- *
- */
+
public class PlanJSONDumpGenerator {
private Map<DumpableNode<?>, Integer> nodeIds; // resolves pact nodes to ids
@@ -260,7 +257,7 @@ public class PlanJSONDumpGenerator {
}
- String name = n.getName();
+ String name = n.getOperatorName();
if (name.equals("Reduce") && (node instanceof SingleInputPlanNode) &&
((SingleInputPlanNode) node).getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE) {
name = "Combine";
@@ -309,8 +306,9 @@ public class PlanJSONDumpGenerator {
}
// output shipping strategy and channel type
final Channel channel = (inConn instanceof Channel) ? (Channel) inConn : null;
- final ShipStrategyType shipType = channel != null ? channel.getShipStrategy() :
- ((DagConnection) inConn).getShipStrategy();
+ final ShipStrategyType shipType = channel != null ?
+ channel.getShipStrategy() :
+ inConn.getShipStrategy();
String shipStrategy = null;
if (shipType != null) {
@@ -623,11 +621,11 @@ public class PlanJSONDumpGenerator {
writer.print("\" }");
}
- public static final String formatNumber(double number) {
+ public static String formatNumber(double number) {
return formatNumber(number, "");
}
- public static final String formatNumber(double number, String suffix) {
+ public static String formatNumber(double number, String suffix) {
if (number <= 0.0) {
return String.valueOf(number);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 943ec2e..0cbcea8 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -18,6 +18,8 @@
package org.apache.flink.optimizer.plantranslate;
+import com.fasterxml.jackson.core.JsonFactory;
+
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.aggregators.AggregatorRegistry;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
@@ -48,7 +50,7 @@ import org.apache.flink.optimizer.plan.WorksetPlanNode;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.optimizer.util.Utils;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
@@ -56,6 +58,7 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
+import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
@@ -78,6 +81,7 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.StringUtils;
import org.apache.flink.util.Visitor;
import java.io.IOException;
@@ -107,7 +111,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
private static final boolean mergeIterationAuxTasks = GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false);
- private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null);
+ private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null, null);
// ------------------------------------------------------------------------
@@ -193,21 +197,27 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName());
}
+ // ----- attach the additional info to the job vertices, for display in the runtime monitor
+
+ attachOperatorNamesAndDescriptions();
+
+ // ----------- finalize the job graph -----------
+
// create the job graph object
JobGraph graph = new JobGraph(program.getJobName());
graph.setNumberOfExecutionRetries(program.getOriginalPactPlan().getNumberOfExecutionRetries());
graph.setAllowQueuedScheduling(false);
-
+
// add vertices to the graph
for (JobVertex vertex : this.vertices.values()) {
graph.addVertex(vertex);
}
-
+
for (JobVertex vertex : this.auxVertices) {
graph.addVertex(vertex);
vertex.setSlotSharingGroup(sharingGroup);
}
-
+
// add registered cache file into job configuration
for (Entry<String, DistributedCacheEntry> e : program.getOriginalPactPlan().getCachedFiles()) {
DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), graph.getJobConfiguration());
@@ -222,9 +232,6 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
throw new RuntimeException("Config object could not be written to Job Configuration: " + e);
}
- String jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(program);
- graph.setJsonPlan(jsonPlan);
-
// release all references again
this.vertices = null;
this.chainedTasks = null;
@@ -507,10 +514,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// update name of container task
String containerTaskName = container.getName();
- if(containerTaskName.startsWith("CHAIN ")) {
- container.setName(containerTaskName+" -> "+chainedTask.getTaskName());
+ if (containerTaskName.startsWith("CHAIN ")) {
+ container.setName(containerTaskName + " -> " + chainedTask.getTaskName());
} else {
- container.setName("CHAIN "+containerTaskName+" -> "+chainedTask.getTaskName());
+ container.setName("CHAIN " + containerTaskName + " -> " + chainedTask.getTaskName());
}
this.chainedTasksInSequence.add(chainedTask);
@@ -581,7 +588,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
}
} catch (Exception e) {
throw new CompilerException(
- "An error occurred while translating the optimized plan to a nephele JobGraph: " + e.getMessage(), e);
+ "An error occurred while translating the optimized plan to a JobGraph: " + e.getMessage(), e);
}
}
@@ -612,7 +619,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// check if the iteration's input is a union
if (iterationNode.getInput().getSource() instanceof NAryUnionPlanNode) {
- allInChannels = ((NAryUnionPlanNode) iterationNode.getInput().getSource()).getInputs().iterator();
+ allInChannels = (iterationNode.getInput().getSource()).getInputs().iterator();
} else {
allInChannels = Collections.singletonList(iterationNode.getInput()).iterator();
}
@@ -631,7 +638,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// check if the iteration's input is a union
if (iterationNode.getInput2().getSource() instanceof NAryUnionPlanNode) {
- allInChannels = ((NAryUnionPlanNode) iterationNode.getInput2().getSource()).getInputs().iterator();
+ allInChannels = (iterationNode.getInput2().getSource()).getInputs().iterator();
} else {
allInChannels = Collections.singletonList(iterationNode.getInput2()).iterator();
}
@@ -760,7 +767,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
final DriverStrategy ds = node.getDriverStrategy();
// check, whether chaining is possible
- boolean chaining = false;
+ boolean chaining;
{
Channel inConn = node.getInput();
PlanNode pred = inConn.getSource();
@@ -804,7 +811,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
if (chaining) {
vertex = null;
config = new TaskConfig(new Configuration());
- this.chainedTasks.put(node, new TaskInChain(ds.getPushChainDriverClass(), config, taskName));
+ this.chainedTasks.put(node, new TaskInChain(node, ds.getPushChainDriverClass(), config, taskName));
} else {
// create task vertex
vertex = new JobVertex(taskName);
@@ -820,7 +827,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// set the driver strategy
config.setDriverStrategy(ds);
- for(int i=0;i<ds.getNumRequiredComparators();i++) {
+ for (int i = 0; i < ds.getNumRequiredComparators(); i++) {
config.setDriverComparator(node.getComparator(i), i);
}
// assign memory, file-handles, etc.
@@ -922,11 +929,11 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
final TaskConfig headConfig;
if (merge) {
final PlanNode successor = pspn.getOutgoingChannels().get(0).getTarget();
- headVertex = (JobVertex) this.vertices.get(successor);
+ headVertex = this.vertices.get(successor);
if (headVertex == null) {
throw new CompilerException(
- "Bug: Trying to merge solution set with its sucessor, but successor has not been created.");
+ "Bug: Trying to merge solution set with its successor, but successor has not been created.");
}
// reset the vertex type to iteration head
@@ -990,7 +997,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
final TaskConfig headConfig;
if (merge) {
final PlanNode successor = wspn.getOutgoingChannels().get(0).getTarget();
- headVertex = (JobVertex) this.vertices.get(successor);
+ headVertex = this.vertices.get(successor);
if (headVertex == null) {
throw new CompilerException(
@@ -1050,13 +1057,6 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
* channel is then the channel into the union node, the local strategy channel the one from the union to the
* actual target operator.
*
- * @param channel
- * @param inputNumber
- * @param sourceVertex
- * @param sourceConfig
- * @param targetVertex
- * @param targetConfig
- * @param isBroadcast
* @throws CompilerException
*/
private DistributionPattern connectJobVertices(Channel channel, int inputNumber,
@@ -1109,7 +1109,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
}
- targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern, resultType);
+ JobEdge edge = targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern, resultType);
// -------------- configure the source task's ship strategy strategies in task config --------------
final int outputIndex = sourceConfig.getNumOutputs();
@@ -1146,6 +1146,35 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
} else {
targetConfig.addInputToGroup(inputNumber);
}
+
+ // ---------------- attach the additional infos to the job edge -------------------
+
+ String shipStrategy = JsonMapper.getShipStrategyString(channel.getShipStrategy());
+ if (channel.getShipStrategyKeys() != null && channel.getShipStrategyKeys().size() > 0) {
+ shipStrategy += " on " + (channel.getShipStrategySortOrder() == null ?
+ channel.getShipStrategyKeys().toString() :
+ Utils.createOrdering(channel.getShipStrategyKeys(), channel.getShipStrategySortOrder()).toString());
+ }
+
+ String localStrategy;
+ if (channel.getLocalStrategy() == null || channel.getLocalStrategy() == LocalStrategy.NONE) {
+ localStrategy = null;
+ }
+ else {
+ localStrategy = JsonMapper.getLocalStrategyString(channel.getLocalStrategy());
+ if (localStrategy != null && channel.getLocalStrategyKeys() != null && channel.getLocalStrategyKeys().size() > 0) {
+ localStrategy += " on " + (channel.getLocalStrategySortOrder() == null ?
+ channel.getLocalStrategyKeys().toString() :
+ Utils.createOrdering(channel.getLocalStrategyKeys(), channel.getLocalStrategySortOrder()).toString());
+ }
+ }
+
+ String caching = channel.getTempMode() == TempMode.NONE ? null : channel.getTempMode().toString();
+
+ edge.setShipStrategyName(shipStrategy);
+ edge.setPreProcessingOperationName(localStrategy);
+ edge.setOperatorLevelCachingDescription(caching);
+
return distributionPattern;
}
@@ -1191,7 +1220,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
if (needsMemory) {
// sanity check
- if (tm == null || tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0) {
+ if (tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0) {
throw new CompilerException("Bug in compiler: Inconsistent description of input materialization.");
}
config.setRelativeInputMaterializationMemory(inputNum, channel.getRelativeTempMemory());
@@ -1247,14 +1276,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
final PlanNode rootOfStepFunction = bulkNode.getRootOfStepFunction();
final TaskConfig tailConfig;
- JobVertex rootOfStepFunctionVertex = (JobVertex) this.vertices.get(rootOfStepFunction);
+ JobVertex rootOfStepFunctionVertex = this.vertices.get(rootOfStepFunction);
if (rootOfStepFunctionVertex == null) {
// last op is chained
final TaskInChain taskInChain = this.chainedTasks.get(rootOfStepFunction);
if (taskInChain == null) {
throw new CompilerException("Bug: Tail of step function not found as vertex or chained task.");
}
- rootOfStepFunctionVertex = (JobVertex) taskInChain.getContainingVertex();
+ rootOfStepFunctionVertex = taskInChain.getContainingVertex();
// the fake channel is statically typed to pact record. no data is sent over this channel anyways.
tailConfig = taskInChain.getTaskConfig();
@@ -1277,7 +1306,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
final TaskConfig tailConfigOfTerminationCriterion;
// If we have a termination criterion and it is not an intermediate node
if(rootOfTerminationCriterion != null && rootOfTerminationCriterion.getOutgoingChannels().isEmpty()) {
- JobVertex rootOfTerminationCriterionVertex = (JobVertex) this.vertices.get(rootOfTerminationCriterion);
+ JobVertex rootOfTerminationCriterionVertex = this.vertices.get(rootOfTerminationCriterion);
if (rootOfTerminationCriterionVertex == null) {
@@ -1286,7 +1315,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
if (taskInChain == null) {
throw new CompilerException("Bug: Tail of termination criterion not found as vertex or chained task.");
}
- rootOfTerminationCriterionVertex = (JobVertex) taskInChain.getContainingVertex();
+ rootOfTerminationCriterionVertex = taskInChain.getContainingVertex();
// the fake channel is statically typed to pact record. no data is sent over this channel anyways.
tailConfigOfTerminationCriterion = taskInChain.getTaskConfig();
@@ -1396,14 +1425,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
{
// get the vertex for the workset update
final TaskConfig worksetTailConfig;
- JobVertex nextWorksetVertex = (JobVertex) this.vertices.get(nextWorksetNode);
+ JobVertex nextWorksetVertex = this.vertices.get(nextWorksetNode);
if (nextWorksetVertex == null) {
// nextWorksetVertex is chained
TaskInChain taskInChain = this.chainedTasks.get(nextWorksetNode);
if (taskInChain == null) {
throw new CompilerException("Bug: Next workset node not found as vertex or chained task.");
}
- nextWorksetVertex = (JobVertex) taskInChain.getContainingVertex();
+ nextWorksetVertex = taskInChain.getContainingVertex();
worksetTailConfig = taskInChain.getTaskConfig();
} else {
worksetTailConfig = new TaskConfig(nextWorksetVertex.getConfiguration());
@@ -1421,14 +1450,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
}
{
final TaskConfig solutionDeltaConfig;
- JobVertex solutionDeltaVertex = (JobVertex) this.vertices.get(solutionDeltaNode);
+ JobVertex solutionDeltaVertex = this.vertices.get(solutionDeltaNode);
if (solutionDeltaVertex == null) {
// last op is chained
TaskInChain taskInChain = this.chainedTasks.get(solutionDeltaNode);
if (taskInChain == null) {
throw new CompilerException("Bug: Solution Set Delta not found as vertex or chained task.");
}
- solutionDeltaVertex = (JobVertex) taskInChain.getContainingVertex();
+ solutionDeltaVertex = taskInChain.getContainingVertex();
solutionDeltaConfig = taskInChain.getTaskConfig();
} else {
solutionDeltaConfig = new TaskConfig(solutionDeltaVertex.getConfiguration());
@@ -1481,7 +1510,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new WorksetEmptyConvergenceCriterion());
}
- private static String getDescriptionForUserCode(UserCodeWrapper<?> wrapper) {
+ private String getDescriptionForUserCode(UserCodeWrapper<?> wrapper) {
try {
if (wrapper.hasObject()) {
try {
@@ -1499,6 +1528,113 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
return null;
}
}
+
+ private void attachOperatorNamesAndDescriptions() {
+ JsonFactory jsonFactory = new JsonFactory();
+
+ // we go back to front
+
+ // start with the in chains
+ for (int i = chainedTasksInSequence.size() - 1; i >= 0; i--) {
+ TaskInChain next = chainedTasksInSequence.get(i);
+ PlanNode planNode = next.getPlanNode();
+
+ JobVertex vertex = next.getContainingVertex();
+
+ // operator
+ String opName = planNode.getOptimizerNode().getOperatorName();
+ if (vertex.getOperatorName() == null) {
+ vertex.setOperatorName(opName);
+ }
+ else {
+ vertex.setOperatorName(opName + " -> " + vertex.getOperatorName());
+ }
+
+ // operator description
+ String opDescription = JsonMapper.getOperatorStrategyString(planNode.getDriverStrategy());
+ if (vertex.getOperatorDescription() == null) {
+ vertex.setOperatorDescription(opDescription);
+ }
+ else {
+ vertex.setOperatorDescription(opDescription + "\n -> " + vertex.getOperatorDescription());
+ }
+
+ // pretty name
+ String prettyName = StringUtils.showControlCharacters(planNode.getNodeName());
+ if (vertex.getOperatorPrettyName() == null) {
+ vertex.setOperatorPrettyName(prettyName);
+ }
+ else {
+ vertex.setOperatorPrettyName(prettyName + "\n -> " + vertex.getOperatorPrettyName());
+ }
+
+ // optimizer output properties
+ if (vertex.getResultOptimizerProperties() == null) {
+ // since we go backwards, this must be the last in its chain
+ String outputProps =
+ JsonMapper.getOptimizerPropertiesJson(jsonFactory, planNode);
+ vertex.setResultOptimizerProperties(outputProps);
+ }
+ }
+
+ // finish back-to-front traversal by going over the head vertices
+ for (Map.Entry<PlanNode, JobVertex> entry : vertices.entrySet()) {
+ PlanNode node = entry.getKey();
+ JobVertex vertex = entry.getValue();
+
+ // get the predecessors
+
+ String input1name = null;
+ String input2name = null;
+ int num = 0;
+ for (Channel c : node.getInputs()) {
+ if (num == 0) {
+ input1name = c.getSource().getNodeName();
+ }
+ else if (num == 1) {
+ input2name = c.getSource().getNodeName();
+ }
+ num++;
+ }
+
+ // operator
+ String opName = node.getOptimizerNode().getOperatorName();
+ if (vertex.getOperatorName() == null) {
+ vertex.setOperatorName(opName);
+ }
+ else {
+ vertex.setOperatorName(opName + " -> " + vertex.getOperatorName());
+ }
+
+ // operator description
+ String opStrategy = JsonMapper.getOperatorStrategyString(
+ node.getDriverStrategy(),
+ input1name != null ? input1name : "(unnamed)",
+ input2name != null ? input2name : "(unnamed)");
+
+ if (vertex.getOperatorDescription() == null) {
+ vertex.setOperatorDescription(opStrategy);
+ }
+ else {
+ vertex.setOperatorDescription(opStrategy + "\n -> " + vertex.getOperatorDescription());
+ }
+
+ // pretty name
+ String prettyName = StringUtils.showControlCharacters(node.getNodeName());
+ if (vertex.getOperatorPrettyName() == null) {
+ vertex.setOperatorPrettyName(prettyName);
+ }
+ else {
+ vertex.setOperatorPrettyName(prettyName + "\n -> " + vertex.getOperatorPrettyName());
+ }
+
+ // if there is not yet an output from a chained task, we set this output
+ if (vertex.getResultOptimizerProperties() == null) {
+ vertex.setResultOptimizerProperties(
+ JsonMapper.getOptimizerPropertiesJson(jsonFactory, node));
+ }
+ }
+ }
// -------------------------------------------------------------------------------------
// Descriptors for tasks / configurations that are chained or merged with other tasks
@@ -1516,15 +1652,24 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
private final String taskName;
+ private final PlanNode planNode;
+
private JobVertex containingVertex;
- TaskInChain(Class<? extends ChainedDriver<?, ?>> chainedTask, TaskConfig taskConfig,
- String taskName) {
+ TaskInChain(PlanNode planNode, Class<? extends ChainedDriver<?, ?>> chainedTask,
+ TaskConfig taskConfig, String taskName) {
+
+ this.planNode = planNode;
this.chainedTask = chainedTask;
this.taskConfig = taskConfig;
this.taskName = taskName;
}
-
+
+
+ public PlanNode getPlanNode() {
+ return planNode;
+ }
+
public Class<? extends ChainedDriver<?, ?>> getChainedTask() {
return this.chainedTask;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
new file mode 100644
index 0000000..4d81058
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plantranslate;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.api.common.operators.CompilerHints;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+import static org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator.formatNumber;
+
+public class JsonMapper {
+
+ public static String getOperatorStrategyString(DriverStrategy strategy) {
+ return getOperatorStrategyString(strategy, "input 1", "input 2");
+ }
+
+ public static String getOperatorStrategyString(DriverStrategy strategy, String firstInputName, String secondInputName) {
+ if (strategy == null) {
+ return "(null)";
+ }
+ switch (strategy) {
+ case SOURCE:
+ return "Data Source";
+ case SINK:
+ return "Data Sink";
+
+ case NONE:
+ return "(none)";
+
+ case BINARY_NO_OP:
+ case UNARY_NO_OP:
+ return "No-Op";
+
+ case COLLECTOR_MAP:
+ case MAP:
+ return "Map";
+
+ case FLAT_MAP:
+ return "FlatMap";
+
+ case MAP_PARTITION:
+ return "Map Partition";
+
+ case ALL_REDUCE:
+ return "Reduce All";
+
+ case ALL_GROUP_REDUCE:
+ case ALL_GROUP_REDUCE_COMBINE:
+ return "Group Reduce All";
+
+ case SORTED_REDUCE:
+ return "Sorted Reduce";
+
+ case SORTED_PARTIAL_REDUCE:
+ return "Sorted Combine/Reduce";
+
+ case SORTED_GROUP_REDUCE:
+ return "Sorted Group Reduce";
+
+ case SORTED_GROUP_COMBINE:
+ return "Sorted Combine";
+
+ case HYBRIDHASH_BUILD_FIRST:
+ return "Hybrid Hash (build: " + firstInputName + ")";
+
+ case HYBRIDHASH_BUILD_SECOND:
+ return "Hybrid Hash (build: " + secondInputName + ")";
+
+ case HYBRIDHASH_BUILD_FIRST_CACHED:
+ return "Hybrid Hash (CACHED) (build: " + firstInputName + ")";
+
+ case HYBRIDHASH_BUILD_SECOND_CACHED:
+ return "Hybrid Hash (CACHED) (build: " + secondInputName + ")";
+
+ case NESTEDLOOP_BLOCKED_OUTER_FIRST:
+ return "Nested Loops (Blocked Outer: " + firstInputName + ")";
+ case NESTEDLOOP_BLOCKED_OUTER_SECOND:
+ return "Nested Loops (Blocked Outer: " + secondInputName + ")";
+ case NESTEDLOOP_STREAMED_OUTER_FIRST:
+ return "Nested Loops (Streamed Outer: " + firstInputName + ")";
+ case NESTEDLOOP_STREAMED_OUTER_SECOND:
+ return "Nested Loops (Streamed Outer: " + secondInputName + ")";
+
+ case MERGE:
+ return "Merge";
+
+ case CO_GROUP:
+ return "Co-Group";
+
+ default:
+ return strategy.name();
+ }
+ }
+
+ public static String getShipStrategyString(ShipStrategyType shipType) {
+ if (shipType == null) {
+ return "(null)";
+ }
+ switch (shipType) {
+ case NONE:
+ return "(none)";
+ case FORWARD:
+ return "Forward";
+ case BROADCAST:
+ return "Broadcast";
+ case PARTITION_HASH:
+ return "Hash Partition";
+ case PARTITION_RANGE:
+ return "Range Partition";
+ case PARTITION_RANDOM:
+ return "Redistribute";
+ case PARTITION_FORCED_REBALANCE:
+ return "Rebalance";
+ case PARTITION_CUSTOM:
+ return "Custom Partition";
+ default:
+ return shipType.name();
+ }
+ }
+
+ public static String getLocalStrategyString(LocalStrategy localStrategy) {
+ if (localStrategy == null) {
+ return "(null)";
+ }
+ switch (localStrategy) {
+ case NONE:
+ return "(none)";
+ case SORT:
+ return "Sort";
+ case COMBININGSORT:
+ return "Sort (combining)";
+ default:
+ return localStrategy.name();
+ }
+ }
+
+ public static String getOptimizerPropertiesJson(JsonFactory jsonFactory, PlanNode node) {
+ try {
+ final StringWriter writer = new StringWriter(256);
+ final JsonGenerator gen = jsonFactory.createGenerator(writer);
+
+ final OptimizerNode optNode = node.getOptimizerNode();
+
+ gen.writeStartObject();
+
+ // global properties
+ if (node.getGlobalProperties() != null) {
+ GlobalProperties gp = node.getGlobalProperties();
+ gen.writeArrayFieldStart("global_properties");
+
+ addProperty(gen, "Partitioning", gp.getPartitioning().name());
+ if (gp.getPartitioningFields() != null) {
+ addProperty(gen, "Partitioned on", gp.getPartitioningFields().toString());
+ }
+ if (gp.getPartitioningOrdering() != null) {
+ addProperty(gen, "Partitioning Order", gp.getPartitioningOrdering().toString());
+ }
+ else {
+ addProperty(gen, "Partitioning Order", "(none)");
+ }
+ if (optNode.getUniqueFields() == null || optNode.getUniqueFields().size() == 0) {
+ addProperty(gen, "Uniqueness", "not unique");
+ }
+ else {
+ addProperty(gen, "Uniqueness", optNode.getUniqueFields().toString());
+ }
+
+ gen.writeEndArray();
+ }
+
+ // local properties
+ if (node.getLocalProperties() != null) {
+ LocalProperties lp = node.getLocalProperties();
+ gen.writeArrayFieldStart("local_properties");
+
+ if (lp.getOrdering() != null) {
+ addProperty(gen, "Order", lp.getOrdering().toString());
+ }
+ else {
+ addProperty(gen, "Order", "(none)");
+ }
+ if (lp.getGroupedFields() != null && lp.getGroupedFields().size() > 0) {
+ addProperty(gen, "Grouped on", lp.getGroupedFields().toString());
+ } else {
+ addProperty(gen, "Grouping", "not grouped");
+ }
+ if (optNode.getUniqueFields() == null || optNode.getUniqueFields().size() == 0) {
+ addProperty(gen, "Uniqueness", "not unique");
+ }
+ else {
+ addProperty(gen, "Uniqueness", optNode.getUniqueFields().toString());
+ }
+
+ gen.writeEndArray();
+ }
+
+ // output size estimates
+ {
+ gen.writeArrayFieldStart("estimates");
+
+ addProperty(gen, "Est. Output Size", optNode.getEstimatedOutputSize() == -1 ? "(unknown)"
+ : formatNumber(optNode.getEstimatedOutputSize(), "B"));
+
+ addProperty(gen, "Est. Cardinality", optNode.getEstimatedNumRecords() == -1 ? "(unknown)"
+ : formatNumber(optNode.getEstimatedNumRecords()));
+ gen.writeEndArray();
+ }
+
+ // output node cost
+ if (node.getNodeCosts() != null) {
+ gen.writeArrayFieldStart("costs");
+
+ addProperty(gen, "Network", node.getNodeCosts().getNetworkCost() == -1 ?
+ "(unknown)" : formatNumber(node.getNodeCosts().getNetworkCost(), "B"));
+ addProperty(gen, "Disk I/O", node.getNodeCosts().getDiskCost() == -1 ?
+ "(unknown)" : formatNumber(node.getNodeCosts().getDiskCost(), "B"));
+ addProperty(gen, "CPU", node.getNodeCosts().getCpuCost() == -1 ?
+ "(unknown)" : formatNumber(node.getNodeCosts().getCpuCost(), ""));
+
+ addProperty(gen, "Cumulative Network", node.getCumulativeCosts().getNetworkCost() == -1 ?
+ "(unknown)" : formatNumber(node.getCumulativeCosts().getNetworkCost(), "B"));
+ addProperty(gen, "Cumulative Disk I/O", node.getCumulativeCosts().getDiskCost() == -1 ?
+ "(unknown)" : formatNumber(node.getCumulativeCosts().getDiskCost(), "B"));
+ addProperty(gen, "Cumulative CPU", node.getCumulativeCosts().getCpuCost() == -1 ?
+ "(unknown)" : formatNumber(node.getCumulativeCosts().getCpuCost(), ""));
+
+ gen.writeEndArray();
+ }
+
+
+ // compiler hints
+ if (optNode.getOperator().getCompilerHints() != null) {
+ CompilerHints hints = optNode.getOperator().getCompilerHints();
+ CompilerHints defaults = new CompilerHints();
+
+ String size = hints.getOutputSize() == defaults.getOutputSize() ?
+ "(none)" : String.valueOf(hints.getOutputSize());
+ String card = hints.getOutputCardinality() == defaults.getOutputCardinality() ?
+ "(none)" : String.valueOf(hints.getOutputCardinality());
+ String width = hints.getAvgOutputRecordSize() == defaults.getAvgOutputRecordSize() ?
+ "(none)" : String.valueOf(hints.getAvgOutputRecordSize());
+ String filter = hints.getFilterFactor() == defaults.getFilterFactor() ?
+ "(none)" : String.valueOf(hints.getFilterFactor());
+
+ gen.writeArrayFieldStart("compiler_hints");
+
+ addProperty(gen, "Output Size (bytes)", size);
+ addProperty(gen, "Output Cardinality", card);
+ addProperty(gen, "Avg. Output Record Size (bytes)", width);
+ addProperty(gen, "Filter Factor", filter);
+
+ gen.writeEndArray();
+ }
+
+
+ gen.writeEndObject();
+
+ gen.close();
+ return writer.toString();
+ }
+ catch (Exception e) {
+ return "{}";
+ }
+ }
+
+
+ private static void addProperty(JsonGenerator gen, String name, String value) throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("name", name);
+ gen.writeStringField("value", value);
+ gen.writeEndObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.jade
index 839f91f..82e61db 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.jade
@@ -18,18 +18,18 @@
.panel.panel-default.panel-multi(ng-if="node")
.panel-heading.clearfix
.panel-title
- | {{ node.pact }}
+ | {{ node.operator }}
.panel-info.first
| ID: {{ node.id }}
- .panel-info(ng-if="node.contents")
+ .panel-info(ng-if="node.description")
.label-group
bs-label(status="{{status}}" ng-repeat="(index, status) in stateList") {{node.vertex.groupvertex[status]}}
.panel-heading.clearfix
- .panel-info.first.last(ng-if="node.contents")
- span {{ node.contents }}
+ .panel-info.first.last(ng-if="node.description")
+ span {{ node.description }}
.panel-body
table.table.table-hover.table-clickable
@@ -79,41 +79,34 @@
thead
tr
th(colspan="2")
- | Pact Properties
+ | Properties
tbody
tr
td Operator
- td(table-property value="node.driver_strategy")
+ td(table-property value="node.operator_strategy")
tr
td Parallelism
td(table-property value="node.parallelism")
- tr
- td Subtasks-per-instance
- td(table-property value="node.subtasks_per_instance")
-
.hidden-sm.col-md-4
table.table.table-properties
thead
tr
th(colspan="2")
- | Pact Properties
+ | Properties
tbody
tr
td Operator
- td(table-property value="node.driver_strategy")
+ td(table-property value="node.operator_strategy")
tr
td Parallelism
td(table-property value="node.parallelism")
- tr
- td Subtasks-per-instance
- td(table-property value="node.subtasks_per_instance")
table.table.table-properties(ng-if="node.estimates")
thead
http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
index bb4d925..ee309ad 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
@@ -235,11 +235,6 @@ angular.module('flinkApp')
'node-iteration'
else
- if el.pact is "Data Source"
- 'node-source'
- else if el.pact is "Data Sink"
- 'node-sink'
- else
'node-normal'
# creates the label of a node, in info is stored, whether it is a special node (like a mirror in an iteration)
@@ -248,13 +243,13 @@ angular.module('flinkApp')
# Nodename
if info is "mirror"
- labelValue += "<h3 class='node-name'>Mirror of " + el.pact + "</h3>"
+ labelValue += "<h3 class='node-name'>Mirror of " + el.operator + "</h3>"
else
- labelValue += "<h3 class='node-name'>" + el.pact + "</h3>"
- if el.contents is ""
+ labelValue += "<h3 class='node-name'>" + el.operator + "</h3>"
+ if el.description is ""
labelValue += ""
else
- stepName = el.contents
+ stepName = el.description
# clean stepName
stepName = shortenString(stepName)
@@ -268,7 +263,7 @@ angular.module('flinkApp')
# Otherwise add infos
labelValue += "<h5>" + info + " Node</h5>" if isSpecialIterationNode(info)
labelValue += "<h5>Parallelism: " + el.parallelism + "</h5>" unless el.parallelism is ""
- labelValue += "<h5>Driver Strategy: " + shortenString(el.driver_strategy) + "</h5" unless el.driver_strategy is `undefined`
+ labelValue += "<h5>Operation: " + shortenString(el.operator_strategy) + "</h5>" unless el.operator is `undefined`
labelValue += "</a>"
labelValue
@@ -398,9 +393,9 @@ angular.module('flinkApp')
existingNodes.push el.id
- # create edges from predecessors to current node
- if el.predecessors?
- for pred in el.predecessors
+ # create edges from inputs to current node
+ if el.inputs?
+ for pred in el.inputs
createEdge(g, data, el, existingNodes, pred)
g