You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:51 UTC
[12/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
new file mode 100644
index 0000000..95adace
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.WorksetIterationNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * A node in the execution, representing a workset iteration (delta iteration).
+ */
+public class WorksetIterationPlanNode extends DualInputPlanNode implements IterationPlanNode {
+
+ private final SolutionSetPlanNode solutionSetPlanNode;
+
+ private final WorksetPlanNode worksetPlanNode;
+
+ private final PlanNode solutionSetDeltaPlanNode;
+
+ private final PlanNode nextWorkSetPlanNode;
+
+ private TypeSerializerFactory<?> worksetSerializer;
+
+ private TypeSerializerFactory<?> solutionSetSerializer;
+
+ private TypeComparatorFactory<?> solutionSetComparator;
+
+ private boolean immediateSolutionSetUpdate;
+
+ public Object postPassHelper;
+
+ private TypeSerializerFactory<?> serializerForIterationChannel;
+
+ // --------------------------------------------------------------------------------------------
+
+ public WorksetIterationPlanNode(WorksetIterationNode template, String nodeName, Channel initialSolutionSet, Channel initialWorkset,
+ SolutionSetPlanNode solutionSetPlanNode, WorksetPlanNode worksetPlanNode,
+ PlanNode nextWorkSetPlanNode, PlanNode solutionSetDeltaPlanNode)
+ {
+ super(template, nodeName, initialSolutionSet, initialWorkset, DriverStrategy.BINARY_NO_OP);
+ this.solutionSetPlanNode = solutionSetPlanNode;
+ this.worksetPlanNode = worksetPlanNode;
+ this.solutionSetDeltaPlanNode = solutionSetDeltaPlanNode;
+ this.nextWorkSetPlanNode = nextWorkSetPlanNode;
+
+ mergeBranchPlanMaps();
+
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public WorksetIterationNode getIterationNode() {
+ if (this.template instanceof WorksetIterationNode) {
+ return (WorksetIterationNode) this.template;
+ } else {
+ throw new RuntimeException();
+ }
+ }
+
+ public SolutionSetPlanNode getSolutionSetPlanNode() {
+ return this.solutionSetPlanNode;
+ }
+
+ public WorksetPlanNode getWorksetPlanNode() {
+ return this.worksetPlanNode;
+ }
+
+ public PlanNode getSolutionSetDeltaPlanNode() {
+ return this.solutionSetDeltaPlanNode;
+ }
+
+ public PlanNode getNextWorkSetPlanNode() {
+ return this.nextWorkSetPlanNode;
+ }
+
+ public Channel getInitialSolutionSetInput() {
+ return getInput1();
+ }
+
+ public Channel getInitialWorksetInput() {
+ return getInput2();
+ }
+
+ public void setImmediateSolutionSetUpdate(boolean immediateUpdate) {
+ this.immediateSolutionSetUpdate = immediateUpdate;
+ }
+
+ public boolean isImmediateSolutionSetUpdate() {
+ return this.immediateSolutionSetUpdate;
+ }
+
+ public FieldList getSolutionSetKeyFields() {
+ return getIterationNode().getSolutionSetKeyFields();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public TypeSerializerFactory<?> getWorksetSerializer() {
+ return worksetSerializer;
+ }
+
+ public void setWorksetSerializer(TypeSerializerFactory<?> worksetSerializer) {
+ this.worksetSerializer = worksetSerializer;
+ }
+
+ public TypeSerializerFactory<?> getSolutionSetSerializer() {
+ return solutionSetSerializer;
+ }
+
+ public void setSolutionSetSerializer(TypeSerializerFactory<?> solutionSetSerializer) {
+ this.solutionSetSerializer = solutionSetSerializer;
+ }
+
+ public TypeComparatorFactory<?> getSolutionSetComparator() {
+ return solutionSetComparator;
+ }
+
+ public void setSolutionSetComparator(TypeComparatorFactory<?> solutionSetComparator) {
+ this.solutionSetComparator = solutionSetComparator;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public void setCosts(Costs nodeCosts) {
+ // add the costs from the step function
+ nodeCosts.addCosts(this.solutionSetDeltaPlanNode.getCumulativeCostsShare());
+ nodeCosts.addCosts(this.nextWorkSetPlanNode.getCumulativeCostsShare());
+
+ super.setCosts(nodeCosts);
+ }
+
+ public int getMemoryConsumerWeight() {
+ // solution set index and workset back channel
+ return 2;
+ }
+
+ @Override
+ public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+ if (source == this) {
+ return FOUND_SOURCE;
+ }
+
+ SourceAndDamReport fromOutside = super.hasDamOnPathDownTo(source);
+
+ if (fromOutside == FOUND_SOURCE_AND_DAM) {
+ return FOUND_SOURCE_AND_DAM;
+ }
+ else if (fromOutside == FOUND_SOURCE) {
+ // we always have a dam in the solution set index
+ return FOUND_SOURCE_AND_DAM;
+ } else {
+ SourceAndDamReport fromNextWorkset = nextWorkSetPlanNode.hasDamOnPathDownTo(source);
+
+ if (fromNextWorkset == FOUND_SOURCE_AND_DAM){
+ return FOUND_SOURCE_AND_DAM;
+ } else if (fromNextWorkset == FOUND_SOURCE){
+ return FOUND_SOURCE_AND_DAM;
+ } else {
+ return this.solutionSetDeltaPlanNode.hasDamOnPathDownTo(source);
+ }
+ }
+ }
+
+ @Override
+ public void acceptForStepFunction(Visitor<PlanNode> visitor) {
+ this.solutionSetDeltaPlanNode.accept(visitor);
+ this.nextWorkSetPlanNode.accept(visitor);
+ }
+
+ /**
+ * Merging can only take place after the solutionSetDelta and nextWorkset PlanNode has been set,
+ * because they can contain also some of the branching nodes.
+ */
+ @Override
+ protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode,PlanNode> branchPlan2) {}
+
+
+ protected void mergeBranchPlanMaps() {
+ Map<OptimizerNode, PlanNode> branchPlan1 = input1.getSource().branchPlan;
+ Map<OptimizerNode, PlanNode> branchPlan2 = input2.getSource().branchPlan;
+
+ // merge the branchPlan maps according the template's uncloseBranchesStack
+ if (this.template.hasUnclosedBranches()) {
+ if (this.branchPlan == null) {
+ this.branchPlan = new HashMap<OptimizerNode, PlanNode>(8);
+ }
+
+ for (OptimizerNode.UnclosedBranchDescriptor uc : this.template.getOpenBranches()) {
+ OptimizerNode brancher = uc.getBranchingNode();
+ PlanNode selectedCandidate = null;
+
+ if (branchPlan1 != null) {
+ // predecessor 1 has branching children, see if it got the branch we are looking for
+ selectedCandidate = branchPlan1.get(brancher);
+ }
+
+ if (selectedCandidate == null && branchPlan2 != null) {
+ // predecessor 2 has branching children, see if it got the branch we are looking for
+ selectedCandidate = branchPlan2.get(brancher);
+ }
+
+ if(selectedCandidate == null && getSolutionSetDeltaPlanNode() != null && getSolutionSetDeltaPlanNode()
+ .branchPlan != null){
+ selectedCandidate = getSolutionSetDeltaPlanNode().branchPlan.get(brancher);
+ }
+
+ if(selectedCandidate == null && getNextWorkSetPlanNode() != null && getNextWorkSetPlanNode()
+ .branchPlan != null){
+ selectedCandidate = getNextWorkSetPlanNode().branchPlan.get(brancher);
+ }
+
+ if (selectedCandidate == null) {
+ throw new CompilerException(
+ "Candidates for a node with open branches are missing information about the selected candidate ");
+ }
+
+ this.branchPlan.put(brancher, selectedCandidate);
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public TypeSerializerFactory<?> getSerializerForIterationChannel() {
+ return serializerForIterationChannel;
+ }
+
+ public void setSerializerForIterationChannel(TypeSerializerFactory<?> serializerForIterationChannel) {
+ this.serializerForIterationChannel = serializerForIterationChannel;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
new file mode 100644
index 0000000..8d044d6
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.WorksetNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DamBehavior;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Plan candidate node for partial solution of a bulk iteration.
+ */
+public class WorksetPlanNode extends PlanNode {
+
+ private static final Costs NO_COSTS = new Costs();
+
+ private WorksetIterationPlanNode containingIterationNode;
+
+ private final Channel initialInput;
+
+ public Object postPassHelper;
+
+
+ public WorksetPlanNode(WorksetNode template, String nodeName,
+ GlobalProperties gProps, LocalProperties lProps,
+ Channel initialInput)
+ {
+ super(template, nodeName, DriverStrategy.NONE);
+
+ this.globalProps = gProps;
+ this.localProps = lProps;
+ this.initialInput = initialInput;
+
+ // the node incurs no cost
+ this.nodeCosts = NO_COSTS;
+ this.cumulativeCosts = NO_COSTS;
+
+ if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) {
+ if (this.branchPlan == null) {
+ this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
+ }
+
+ this.branchPlan.putAll(initialInput.getSource().branchPlan);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public WorksetNode getWorksetNode() {
+ return (WorksetNode) this.template;
+ }
+
+ public WorksetIterationPlanNode getContainingIterationNode() {
+ return this.containingIterationNode;
+ }
+
+ public void setContainingIterationNode(WorksetIterationPlanNode containingIterationNode) {
+ this.containingIterationNode = containingIterationNode;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+
+ @Override
+ public void accept(Visitor<PlanNode> visitor) {
+ if (visitor.preVisit(this)) {
+ visitor.postVisit(this);
+ }
+ }
+
+
+ @Override
+ public Iterable<PlanNode> getPredecessors() {
+ return Collections.<PlanNode>emptyList();
+ }
+
+
+ @Override
+ public Iterable<Channel> getInputs() {
+ return Collections.<Channel>emptyList();
+ }
+
+
+ @Override
+ public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+ if (source == this) {
+ return FOUND_SOURCE;
+ }
+ SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source);
+ if (res == FOUND_SOURCE_AND_DAM) {
+ return FOUND_SOURCE_AND_DAM;
+ }
+ else if (res == FOUND_SOURCE) {
+ return (this.initialInput.getLocalStrategy().dams() ||
+ this.initialInput.getTempMode().breaksPipeline() ||
+ getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ?
+ FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
+ }
+ else {
+ return NOT_FOUND;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
new file mode 100644
index 0000000..3f8cb46
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plandump;
+
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+
+/**
+ *
+ */
+public interface DumpableConnection<T extends DumpableNode<T>> {
+
+ public DumpableNode<T> getSource();
+
+ public ShipStrategyType getShipStrategy();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
new file mode 100644
index 0000000..1bc0f0c
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plandump;
+
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.plan.PlanNode;
+
+/**
+ *
+ */
+public interface DumpableNode<T extends DumpableNode<T>> {
+
+ /**
+ * Gets an iterator over the predecessors.
+ *
+ * @return An iterator over the predecessors.
+ */
+ Iterable<T> getPredecessors();
+
+ Iterable<DumpableConnection<T>> getDumpableInputs();
+
+ OptimizerNode getOptimizerNode();
+
+ PlanNode getPlanNode();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
new file mode 100644
index 0000000..6f918c0
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plandump;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.api.common.operators.CompilerHints;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.BinaryUnionNode;
+import org.apache.flink.optimizer.dag.BulkIterationNode;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.optimizer.dag.DataSourceNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.DagConnection;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.dag.WorksetIterationNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.util.Utils;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.StringUtils;
+
+/**
+ *
+ */
+public class PlanJSONDumpGenerator {
+
+ private Map<DumpableNode<?>, Integer> nodeIds; // resolves pact nodes to ids
+
+ private int nodeCnt;
+
+ private boolean encodeForHTML;
+
+ // --------------------------------------------------------------------------------------------
+
+ public void setEncodeForHTML(boolean encodeForHTML) {
+ this.encodeForHTML = encodeForHTML;
+ }
+
+ public boolean isEncodeForHTML() {
+ return encodeForHTML;
+ }
+
+
+ public void dumpPactPlanAsJSON(List<DataSinkNode> nodes, PrintWriter writer) {
+ @SuppressWarnings("unchecked")
+ List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) nodes;
+ compilePlanToJSON(n, writer);
+ }
+
+ public String getPactPlanAsJSON(List<DataSinkNode> nodes) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ dumpPactPlanAsJSON(nodes, pw);
+ return sw.toString();
+ }
+
+ public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, File toFile) throws IOException {
+ PrintWriter pw = null;
+ try {
+ pw = new PrintWriter(new FileOutputStream(toFile), false);
+ dumpOptimizerPlanAsJSON(plan, pw);
+ pw.flush();
+ } finally {
+ if (pw != null) {
+ pw.close();
+ }
+ }
+ }
+
+ public String getOptimizerPlanAsJSON(OptimizedPlan plan) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ dumpOptimizerPlanAsJSON(plan, pw);
+ pw.close();
+ return sw.toString();
+ }
+
+ public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, PrintWriter writer) {
+ Collection<SinkPlanNode> sinks = plan.getDataSinks();
+ if (sinks instanceof List) {
+ dumpOptimizerPlanAsJSON((List<SinkPlanNode>) sinks, writer);
+ } else {
+ List<SinkPlanNode> n = new ArrayList<SinkPlanNode>();
+ n.addAll(sinks);
+ dumpOptimizerPlanAsJSON(n, writer);
+ }
+ }
+
+ public void dumpOptimizerPlanAsJSON(List<SinkPlanNode> nodes, PrintWriter writer) {
+ @SuppressWarnings("unchecked")
+ List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) nodes;
+ compilePlanToJSON(n, writer);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void compilePlanToJSON(List<DumpableNode<?>> nodes, PrintWriter writer) {
+ // initialization to assign node ids
+ this.nodeIds = new HashMap<DumpableNode<?>, Integer>();
+ this.nodeCnt = 0;
+
+ // JSON header
+ writer.print("{\n\t\"nodes\": [\n\n");
+
+ // Generate JSON for plan
+ for (int i = 0; i < nodes.size(); i++) {
+ visit(nodes.get(i), writer, i == 0);
+ }
+
+ // JSON Footer
+ writer.println("\n\t]\n}");
+ }
+
+ private boolean visit(DumpableNode<?> node, PrintWriter writer, boolean first) {
+ // check for duplicate traversal
+ if (this.nodeIds.containsKey(node)) {
+ return false;
+ }
+
+ // assign an id first
+ this.nodeIds.put(node, this.nodeCnt++);
+
+ // then recurse
+ for (DumpableNode<?> child : node.getPredecessors()) {
+ //This is important, because when the node was already in the graph it is not allowed
+ //to set first to false!
+ if (visit(child, writer, first)) {
+ first = false;
+ };
+ }
+
+ // check if this node should be skipped from the dump
+ final OptimizerNode n = node.getOptimizerNode();
+
+ // ------------------ dump after the ascend ---------------------
+ // start a new node and output node id
+ if (!first) {
+ writer.print(",\n");
+ }
+ // open the node
+ writer.print("\t{\n");
+
+ // recurse, it is is an iteration node
+ if (node instanceof BulkIterationNode || node instanceof BulkIterationPlanNode) {
+
+ DumpableNode<?> innerChild = node instanceof BulkIterationNode ?
+ ((BulkIterationNode) node).getNextPartialSolution() :
+ ((BulkIterationPlanNode) node).getRootOfStepFunction();
+
+ DumpableNode<?> begin = node instanceof BulkIterationNode ?
+ ((BulkIterationNode) node).getPartialSolution() :
+ ((BulkIterationPlanNode) node).getPartialSolutionPlanNode();
+
+ writer.print("\t\t\"step_function\": [\n");
+
+ visit(innerChild, writer, true);
+
+ writer.print("\n\t\t],\n");
+ writer.print("\t\t\"partial_solution\": " + this.nodeIds.get(begin) + ",\n");
+ writer.print("\t\t\"next_partial_solution\": " + this.nodeIds.get(innerChild) + ",\n");
+ } else if (node instanceof WorksetIterationNode || node instanceof WorksetIterationPlanNode) {
+
+ DumpableNode<?> worksetRoot = node instanceof WorksetIterationNode ?
+ ((WorksetIterationNode) node).getNextWorkset() :
+ ((WorksetIterationPlanNode) node).getNextWorkSetPlanNode();
+ DumpableNode<?> solutionDelta = node instanceof WorksetIterationNode ?
+ ((WorksetIterationNode) node).getSolutionSetDelta() :
+ ((WorksetIterationPlanNode) node).getSolutionSetDeltaPlanNode();
+
+ DumpableNode<?> workset = node instanceof WorksetIterationNode ?
+ ((WorksetIterationNode) node).getWorksetNode() :
+ ((WorksetIterationPlanNode) node).getWorksetPlanNode();
+ DumpableNode<?> solutionSet = node instanceof WorksetIterationNode ?
+ ((WorksetIterationNode) node).getSolutionSetNode() :
+ ((WorksetIterationPlanNode) node).getSolutionSetPlanNode();
+
+ writer.print("\t\t\"step_function\": [\n");
+
+ visit(worksetRoot, writer, true);
+ visit(solutionDelta, writer, false);
+
+ writer.print("\n\t\t],\n");
+ writer.print("\t\t\"workset\": " + this.nodeIds.get(workset) + ",\n");
+ writer.print("\t\t\"solution_set\": " + this.nodeIds.get(solutionSet) + ",\n");
+ writer.print("\t\t\"next_workset\": " + this.nodeIds.get(worksetRoot) + ",\n");
+ writer.print("\t\t\"solution_delta\": " + this.nodeIds.get(solutionDelta) + ",\n");
+ }
+
+ // print the id
+ writer.print("\t\t\"id\": " + this.nodeIds.get(node));
+
+
+ final String type;
+ String contents;
+ if (n instanceof DataSinkNode) {
+ type = "sink";
+ contents = n.getOperator().toString();
+ } else if (n instanceof DataSourceNode) {
+ type = "source";
+ contents = n.getOperator().toString();
+ }
+ else if (n instanceof BulkIterationNode) {
+ type = "bulk_iteration";
+ contents = n.getOperator().getName();
+ }
+ else if (n instanceof WorksetIterationNode) {
+ type = "workset_iteration";
+ contents = n.getOperator().getName();
+ }
+ else if (n instanceof BinaryUnionNode) {
+ type = "pact";
+ contents = "";
+ }
+ else {
+ type = "pact";
+ contents = n.getOperator().getName();
+ }
+
+ contents = StringUtils.showControlCharacters(contents);
+ if (encodeForHTML) {
+ contents = StringEscapeUtils.escapeHtml4(contents);
+ contents = contents.replace("\\", "\");
+ }
+
+
+ String name = n.getName();
+ if (name.equals("Reduce") && (node instanceof SingleInputPlanNode) &&
+ ((SingleInputPlanNode) node).getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE) {
+ name = "Combine";
+ }
+
+ // output the type identifier
+ writer.print(",\n\t\t\"type\": \"" + type + "\"");
+
+ // output node name
+ writer.print(",\n\t\t\"pact\": \"" + name + "\"");
+
+ // output node contents
+ writer.print(",\n\t\t\"contents\": \"" + contents + "\"");
+
+ // degree of parallelism
+ writer.print(",\n\t\t\"parallelism\": \""
+ + (n.getParallelism() >= 1 ? n.getParallelism() : "default") + "\"");
+
+ // output node predecessors
+ Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs().iterator();
+ String child1name = "", child2name = "";
+
+ if (inConns != null && inConns.hasNext()) {
+ // start predecessor list
+ writer.print(",\n\t\t\"predecessors\": [");
+ int inputNum = 0;
+
+ while (inConns.hasNext()) {
+ final DumpableConnection<?> inConn = inConns.next();
+ final DumpableNode<?> source = inConn.getSource();
+ writer.print(inputNum == 0 ? "\n" : ",\n");
+ if (inputNum == 0) {
+ child1name += child1name.length() > 0 ? ", " : "";
+ child1name += source.getOptimizerNode().getOperator().getName();
+ } else if (inputNum == 1) {
+ child2name += child2name.length() > 0 ? ", " : "";
+ child2name = source.getOptimizerNode().getOperator().getName();
+ }
+
+ // output predecessor id
+ writer.print("\t\t\t{\"id\": " + this.nodeIds.get(source));
+
+ // output connection side
+ if (inConns.hasNext() || inputNum > 0) {
+ writer.print(", \"side\": \"" + (inputNum == 0 ? "first" : "second") + "\"");
+ }
+ // output shipping strategy and channel type
+ final Channel channel = (inConn instanceof Channel) ? (Channel) inConn : null;
+ final ShipStrategyType shipType = channel != null ? channel.getShipStrategy() :
+ ((DagConnection) inConn).getShipStrategy();
+
+ String shipStrategy = null;
+ if (shipType != null) {
+ switch (shipType) {
+ case NONE:
+ // nothing
+ break;
+ case FORWARD:
+ shipStrategy = "Forward";
+ break;
+ case BROADCAST:
+ shipStrategy = "Broadcast";
+ break;
+ case PARTITION_HASH:
+ shipStrategy = "Hash Partition";
+ break;
+ case PARTITION_RANGE:
+ shipStrategy = "Range Partition";
+ break;
+ case PARTITION_RANDOM:
+ shipStrategy = "Redistribute";
+ break;
+ case PARTITION_FORCED_REBALANCE:
+ shipStrategy = "Rebalance";
+ break;
+ case PARTITION_CUSTOM:
+ shipStrategy = "Custom Partition";
+ break;
+ default:
+ throw new CompilerException("Unknown ship strategy '" + inConn.getShipStrategy().name()
+ + "' in JSON generator.");
+ }
+ }
+
+ if (channel != null && channel.getShipStrategyKeys() != null && channel.getShipStrategyKeys().size() > 0) {
+ shipStrategy += " on " + (channel.getShipStrategySortOrder() == null ?
+ channel.getShipStrategyKeys().toString() :
+ Utils.createOrdering(channel.getShipStrategyKeys(), channel.getShipStrategySortOrder()).toString());
+ }
+
+ if (shipStrategy != null) {
+ writer.print(", \"ship_strategy\": \"" + shipStrategy + "\"");
+ }
+
+ if (channel != null) {
+ String localStrategy = null;
+ switch (channel.getLocalStrategy()) {
+ case NONE:
+ break;
+ case SORT:
+ localStrategy = "Sort";
+ break;
+ case COMBININGSORT:
+ localStrategy = "Sort (combining)";
+ break;
+ default:
+ throw new CompilerException("Unknown local strategy " + channel.getLocalStrategy().name());
+ }
+
+ if (channel != null && channel.getLocalStrategyKeys() != null && channel.getLocalStrategyKeys().size() > 0) {
+ localStrategy += " on " + (channel.getLocalStrategySortOrder() == null ?
+ channel.getLocalStrategyKeys().toString() :
+ Utils.createOrdering(channel.getLocalStrategyKeys(), channel.getLocalStrategySortOrder()).toString());
+ }
+
+ if (localStrategy != null) {
+ writer.print(", \"local_strategy\": \"" + localStrategy + "\"");
+ }
+
+ if (channel != null && channel.getTempMode() != TempMode.NONE) {
+ String tempMode = channel.getTempMode().toString();
+ writer.print(", \"temp_mode\": \"" + tempMode + "\"");
+ }
+ }
+
+ writer.print('}');
+ inputNum++;
+ }
+ // finish predecessors
+ writer.print("\n\t\t]");
+ }
+
+ //---------------------------------------------------------------------------------------
+ // the part below here is relevant only to plan nodes with concrete strategies, etc
+ //---------------------------------------------------------------------------------------
+
+ final PlanNode p = node.getPlanNode();
+ if (p == null) {
+ // finish node
+ writer.print("\n\t}");
+ return true;
+ }
+ // local strategy
+ String locString = null;
+ if (p.getDriverStrategy() != null) {
+ switch (p.getDriverStrategy()) {
+ case NONE:
+ case BINARY_NO_OP:
+ break;
+
+ case UNARY_NO_OP:
+ locString = "No-Op";
+ break;
+
+ case COLLECTOR_MAP:
+ case MAP:
+ locString = "Map";
+ break;
+
+ case FLAT_MAP:
+ locString = "FlatMap";
+ break;
+
+ case MAP_PARTITION:
+ locString = "Map Partition";
+ break;
+
+ case ALL_REDUCE:
+ locString = "Reduce All";
+ break;
+
+ case ALL_GROUP_REDUCE:
+ case ALL_GROUP_REDUCE_COMBINE:
+ locString = "Group Reduce All";
+ break;
+
+ case SORTED_REDUCE:
+ locString = "Sorted Reduce";
+ break;
+
+ case SORTED_PARTIAL_REDUCE:
+ locString = "Sorted Combine/Reduce";
+ break;
+
+ case SORTED_GROUP_REDUCE:
+ locString = "Sorted Group Reduce";
+ break;
+
+ case SORTED_GROUP_COMBINE:
+ locString = "Sorted Combine";
+ break;
+
+ case HYBRIDHASH_BUILD_FIRST:
+ locString = "Hybrid Hash (build: " + child1name + ")";
+ break;
+ case HYBRIDHASH_BUILD_SECOND:
+ locString = "Hybrid Hash (build: " + child2name + ")";
+ break;
+
+ case HYBRIDHASH_BUILD_FIRST_CACHED:
+ locString = "Hybrid Hash (CACHED) (build: " + child1name + ")";
+ break;
+ case HYBRIDHASH_BUILD_SECOND_CACHED:
+ locString = "Hybrid Hash (CACHED) (build: " + child2name + ")";
+ break;
+
+ case NESTEDLOOP_BLOCKED_OUTER_FIRST:
+ locString = "Nested Loops (Blocked Outer: " + child1name + ")";
+ break;
+ case NESTEDLOOP_BLOCKED_OUTER_SECOND:
+ locString = "Nested Loops (Blocked Outer: " + child2name + ")";
+ break;
+ case NESTEDLOOP_STREAMED_OUTER_FIRST:
+ locString = "Nested Loops (Streamed Outer: " + child1name + ")";
+ break;
+ case NESTEDLOOP_STREAMED_OUTER_SECOND:
+ locString = "Nested Loops (Streamed Outer: " + child2name + ")";
+ break;
+
+ case MERGE:
+ locString = "Merge";
+ break;
+
+ case CO_GROUP:
+ locString = "Co-Group";
+ break;
+
+ default:
+ locString = p.getDriverStrategy().name();
+ break;
+ }
+
+ if (locString != null) {
+ writer.print(",\n\t\t\"driver_strategy\": \"");
+ writer.print(locString);
+ writer.print("\"");
+ }
+ }
+
+ {
+ // output node global properties
+ final GlobalProperties gp = p.getGlobalProperties();
+
+ writer.print(",\n\t\t\"global_properties\": [\n");
+
+ addProperty(writer, "Partitioning", gp.getPartitioning().name(), true);
+ if (gp.getPartitioningFields() != null) {
+ addProperty(writer, "Partitioned on", gp.getPartitioningFields().toString(), false);
+ }
+ if (gp.getPartitioningOrdering() != null) {
+ addProperty(writer, "Partitioning Order", gp.getPartitioningOrdering().toString(), false);
+ }
+ else {
+ addProperty(writer, "Partitioning Order", "(none)", false);
+ }
+ if (n.getUniqueFields() == null || n.getUniqueFields().size() == 0) {
+ addProperty(writer, "Uniqueness", "not unique", false);
+ }
+ else {
+ addProperty(writer, "Uniqueness", n.getUniqueFields().toString(), false);
+ }
+
+ writer.print("\n\t\t]");
+ }
+
+ {
+ // output node local properties
+ LocalProperties lp = p.getLocalProperties();
+
+ writer.print(",\n\t\t\"local_properties\": [\n");
+
+ if (lp.getOrdering() != null) {
+ addProperty(writer, "Order", lp.getOrdering().toString(), true);
+ }
+ else {
+ addProperty(writer, "Order", "(none)", true);
+ }
+ if (lp.getGroupedFields() != null && lp.getGroupedFields().size() > 0) {
+ addProperty(writer, "Grouped on", lp.getGroupedFields().toString(), false);
+ } else {
+ addProperty(writer, "Grouping", "not grouped", false);
+ }
+ if (n.getUniqueFields() == null || n.getUniqueFields().size() == 0) {
+ addProperty(writer, "Uniqueness", "not unique", false);
+ }
+ else {
+ addProperty(writer, "Uniqueness", n.getUniqueFields().toString(), false);
+ }
+
+ writer.print("\n\t\t]");
+ }
+
+ // output node size estimates
+ writer.print(",\n\t\t\"estimates\": [\n");
+
+ addProperty(writer, "Est. Output Size", n.getEstimatedOutputSize() == -1 ? "(unknown)"
+ : formatNumber(n.getEstimatedOutputSize(), "B"), true);
+ addProperty(writer, "Est. Cardinality", n.getEstimatedNumRecords() == -1 ? "(unknown)"
+ : formatNumber(n.getEstimatedNumRecords()), false);
+
+ writer.print("\t\t]");
+
+ // output node cost
+ if (p.getNodeCosts() != null) {
+ writer.print(",\n\t\t\"costs\": [\n");
+
+ addProperty(writer, "Network", p.getNodeCosts().getNetworkCost() == -1 ? "(unknown)"
+ : formatNumber(p.getNodeCosts().getNetworkCost(), "B"), true);
+ addProperty(writer, "Disk I/O", p.getNodeCosts().getDiskCost() == -1 ? "(unknown)"
+ : formatNumber(p.getNodeCosts().getDiskCost(), "B"), false);
+ addProperty(writer, "CPU", p.getNodeCosts().getCpuCost() == -1 ? "(unknown)"
+ : formatNumber(p.getNodeCosts().getCpuCost(), ""), false);
+
+ addProperty(writer, "Cumulative Network",
+ p.getCumulativeCosts().getNetworkCost() == -1 ? "(unknown)" : formatNumber(p
+ .getCumulativeCosts().getNetworkCost(), "B"), false);
+ addProperty(writer, "Cumulative Disk I/O",
+ p.getCumulativeCosts().getDiskCost() == -1 ? "(unknown)" : formatNumber(p
+ .getCumulativeCosts().getDiskCost(), "B"), false);
+ addProperty(writer, "Cumulative CPU",
+ p.getCumulativeCosts().getCpuCost() == -1 ? "(unknown)" : formatNumber(p
+ .getCumulativeCosts().getCpuCost(), ""), false);
+
+ writer.print("\n\t\t]");
+ }
+
+ // output the node compiler hints
+ if (n.getOperator().getCompilerHints() != null) {
+ CompilerHints hints = n.getOperator().getCompilerHints();
+ CompilerHints defaults = new CompilerHints();
+
+ String size = hints.getOutputSize() == defaults.getOutputSize() ? "(none)" : String.valueOf(hints.getOutputSize());
+ String card = hints.getOutputCardinality() == defaults.getOutputCardinality() ? "(none)" : String.valueOf(hints.getOutputCardinality());
+ String width = hints.getAvgOutputRecordSize() == defaults.getAvgOutputRecordSize() ? "(none)" : String.valueOf(hints.getAvgOutputRecordSize());
+ String filter = hints.getFilterFactor() == defaults.getFilterFactor() ? "(none)" : String.valueOf(hints.getFilterFactor());
+
+ writer.print(",\n\t\t\"compiler_hints\": [\n");
+
+ addProperty(writer, "Output Size (bytes)", size, true);
+ addProperty(writer, "Output Cardinality", card, false);
+ addProperty(writer, "Avg. Output Record Size (bytes)", width, false);
+ addProperty(writer, "Filter Factor", filter, false);
+
+ writer.print("\t\t]");
+ }
+
+ // finish node
+ writer.print("\n\t}");
+ return true;
+ }
+
+ private void addProperty(PrintWriter writer, String name, String value, boolean first) {
+ if (!first) {
+ writer.print(",\n");
+ }
+ writer.print("\t\t\t{ \"name\": \"");
+ writer.print(name);
+ writer.print("\", \"value\": \"");
+ writer.print(value);
+ writer.print("\" }");
+ }
+
+ public static final String formatNumber(double number) {
+ return formatNumber(number, "");
+ }
+
+ public static final String formatNumber(double number, String suffix) {
+ if (number <= 0.0) {
+ return String.valueOf(number);
+ }
+
+ int power = (int) Math.ceil(Math.log10(number));
+
+ int group = (power - 1) / 3;
+ if (group >= SIZE_SUFFIXES.length) {
+ group = SIZE_SUFFIXES.length - 1;
+ } else if (group < 0) {
+ group = 0;
+ }
+
+ // truncate fractional part
+ int beforeDecimal = power - group * 3;
+ if (power > beforeDecimal) {
+ for (int i = power - beforeDecimal; i > 0; i--) {
+ number /= 10;
+ }
+ }
+
+ return group > 0 ? String.format(Locale.US, "%.2f %s", number, SIZE_SUFFIXES[group]) :
+ String.format(Locale.US, "%.2f", number);
+ }
+
+ private static final char[] SIZE_SUFFIXES = { 0, 'K', 'M', 'G', 'T' };
+}